From 5a48a2fb1819fe4eeee5a09c50447c600f7ae6d9 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 1 Dec 2022 11:34:43 +0100 Subject: [PATCH 1/4] implemented synch for single mdstore --- .../eu/dnetlib/dhp/common/MDStoreInfo.java | 100 ++++++++++ .../eu/dnetlib/dhp/common/MdstoreClient.java | 38 +++- .../dnetlib/dhp/common/MdStoreClientTest.java | 36 ++++ .../raw/MigrateMongoMdstoresApplication.java | 183 ++++++++++++++++-- .../common/AbstractMigrationApplication.java | 15 +- .../migrate_mongo_mstores_parameters.json | 6 + .../oa/graph/raw_all/oozie_app/workflow.xml | 3 + .../oa/graph/raw_step1/oozie_app/workflow.xml | 61 +----- 8 files changed, 349 insertions(+), 93 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java create mode 100644 dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java new file mode 100644 index 000000000..bd1ccca50 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java @@ -0,0 +1,100 @@ + +package eu.dnetlib.dhp.common; + +/** + * This utility represent the Metadata Store information + * needed during the migration from mongo to HDFS to store + */ +public class MDStoreInfo { + private String mdstore; + private String currentId; + private Long latestTimestamp; + + /** + * Instantiates a new Md store info. + */ + public MDStoreInfo() { + } + + /** + * Instantiates a new Md store info. + * + * @param mdstore the mdstore + * @param currentId the current id + * @param latestTimestamp the latest timestamp + */ + public MDStoreInfo(String mdstore, String currentId, Long latestTimestamp) { + this.mdstore = mdstore; + this.currentId = currentId; + this.latestTimestamp = latestTimestamp; + } + + /** + * Gets mdstore. + * + * @return the mdstore + */ + public String getMdstore() { + return mdstore; + } + + /** + * Sets mdstore. + * + * @param mdstore the mdstore + * @return the mdstore + */ + public MDStoreInfo setMdstore(String mdstore) { + this.mdstore = mdstore; + return this; + } + + /** + * Gets current id. + * + * @return the current id + */ + public String getCurrentId() { + return currentId; + } + + /** + * Sets current id. + * + * @param currentId the current id + * @return the current id + */ + public MDStoreInfo setCurrentId(String currentId) { + this.currentId = currentId; + return this; + } + + /** + * Gets latest timestamp. + * + * @return the latest timestamp + */ + public Long getLatestTimestamp() { + return latestTimestamp; + } + + /** + * Sets latest timestamp. + * + * @param latestTimestamp the latest timestamp + * @return the latest timestamp + */ + public MDStoreInfo setLatestTimestamp(Long latestTimestamp) { + this.latestTimestamp = latestTimestamp; + return this; + } + + @Override + public String toString() { + return "MDStoreInfo{" + + "mdstore='" + mdstore + '\'' + + ", currentId='" + currentId + '\'' + + ", latestTimestamp=" + latestTimestamp + + '}'; + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java index d06544ae1..34aa37be5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java @@ -1,12 +1,12 @@ package eu.dnetlib.dhp.common; +import static com.mongodb.client.model.Sorts.descending; + import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.commons.lang3.StringUtils; @@ -38,6 +38,26 @@ public class MdstoreClient implements Closeable { this.db = getDb(client, dbName); } + private Long parseTimestamp(Document f) { + if (f == null || !f.containsKey("timestamp")) + return null; + + Object ts = f.get("timestamp"); + + return Long.parseLong(ts.toString()); + } + + public Long getLatestTimestamp(final String collectionId) { + MongoCollection collection = db.getCollection(collectionId); + FindIterable result = collection.find().sort(descending("timestamp")).limit(1); + if (result == null) { + return null; + } + + Document f = result.first(); + return parseTimestamp(f); + } + public MongoCollection mdStore(final String mdId) { BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get(); @@ -54,6 +74,16 @@ public class MdstoreClient implements Closeable { return getColl(db, currentId, true); } + public List mdStoreWithTimestamp(final String mdFormat, final String mdLayout, + final String mdInterpretation) { + Map res = validCollections(mdFormat, mdLayout, mdInterpretation); + return res + .entrySet() + .stream() + .map(e -> new MDStoreInfo(e.getKey(), e.getValue(), getLatestTimestamp(e.getValue()))) + .collect(Collectors.toList()); + } + public Map validCollections( final String mdFormat, final String mdLayout, final String mdInterpretation) { diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java new file mode 100644 index 000000000..f38d04979 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java @@ -0,0 +1,36 @@ + +package eu.dnetlib.dhp.common; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class MdStoreClientTest { + + @Test + public void testMongoCollection() throws IOException { + final MdstoreClient client = new MdstoreClient("mongodb://localhost:27017", "mdstore"); + + final ObjectMapper mapper = new ObjectMapper(); + + final List infos = client.mdStoreWithTimestamp("ODF", "store", "cleaned"); + + infos.forEach(System.out::println); + + final String s = mapper.writeValueAsString(infos); + + Path fileName = Paths.get("/Users/sandro/mdstore_info.json"); + + // Writing into the file + Files.write(fileName, s.getBytes(StandardCharsets.UTF_8)); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 6dbab96cb..57ca9b4c4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -1,31 +1,87 @@ package eu.dnetlib.dhp.oa.graph.raw; +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.Objects; +import java.util.function.Consumer; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.MDStoreInfo; import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplication.class); - private final MdstoreClient mdstoreClient; + private static List snapshotsMDStores(final MdstoreClient client, + final String format, + final String layout, + final String interpretation) { + return client.mdStoreWithTimestamp(format, layout, interpretation); + } + + private static MDStoreInfo extractPath(final String path, final String basePath) { + int res = path.indexOf(basePath); + if (res > 0) { + String[] split = path.substring(res).split("/"); + if (split.length > 2) { + final String ts = split[split.length - 1]; + final String mdStore = split[split.length - 2]; + return new MDStoreInfo(mdStore, null, Long.parseLong(ts)); + } + } + return null; + } + + private static Map hdfsMDStoreInfo(FileSystem fs, final String basePath) throws IOException { + final Map hdfs_store = new HashMap<>(); + final Path p = new Path(basePath); + final RemoteIterator ls = fs.listFiles(p, true); + while (ls.hasNext()) { + + String current = ls.next().getPath().toString(); + + final MDStoreInfo info = extractPath(current, basePath); + if (info != null) { + hdfs_store.put(info.getMdstore(), info); + } + } + return hdfs_store; + } + + private static String createMDStoreDir(final String basePath, final String mdStoreId) { + if (basePath.endsWith("/")) { + return basePath + mdStoreId; + } else { + return String.format("%s/%s", basePath, mdStoreId); + } + } + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - MigrateMongoMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json"))); + Objects + .requireNonNull( + MigrateMongoMdstoresApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json")))); parser.parseArgument(args); final String mongoBaseUrl = parser.get("mongoBaseUrl"); @@ -36,30 +92,115 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio final String mdInterpretation = parser.get("mdInterpretation"); final String hdfsPath = parser.get("hdfsPath"); + final String nameNode = parser.get("nameNode"); - try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl, - mongoDb)) { - app.execute(mdFormat, mdLayout, mdInterpretation); - } + final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(nameNode)); + + final MdstoreClient mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); + + final List mongoMDStores = snapshotsMDStores(mdstoreClient, mdFormat, mdLayout, mdInterpretation); + + final Map hdfsMDStores = hdfsMDStoreInfo(fileSystem, hdfsPath); + + mongoMDStores + .stream() + .filter(currentMDStore -> currentMDStore.getLatestTimestamp() != null) + .forEach( + consumeMDStore( + mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, hdfsMDStores)); } - public MigrateMongoMdstoresApplication( - final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception { + /** + * This method is responsible to synch only the mongoMDStore that changed since last time + * @param mdFormat the MDStore's format + * @param mdLayout the MDStore'slayout + * @param mdInterpretation the MDStore's interpretation + * @param hdfsPath the basePath into hdfs where all MD-stores are stored + * @param fileSystem The Hadoop File system client + * @param hdfsMDStores A Map containing as Key the mdstore ID and as value the @{@link MDStoreInfo} + * @return + */ + private static Consumer consumeMDStore(String mdFormat, String mdLayout, String mdInterpretation, + String hdfsPath, FileSystem fileSystem, final String mongoBaseUrl, final String mongoDb, + Map hdfsMDStores) { + return currentMDStore -> { + // If the key is missing it means that the mdstore is not present in hdfs + // that is the hdfs path basePath/MDSTOREID/timestamp is missing + // So we have to synch it + if (!hdfsMDStores.containsKey(currentMDStore.getMdstore())) { + log.info("Adding store " + currentMDStore.getMdstore()); + try { + synchMDStoreIntoHDFS( + mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, + currentMDStore); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + final MDStoreInfo current = hdfsMDStores.get(currentMDStore.getMdstore()); + // IF the key is present it means that in hdfs we have a path + // basePath/MDSTOREID/timestamp but the timestamp on hdfs is older that the + // new one in mongo so we have to synch the new mdstore and delete the old one + if (currentMDStore.getLatestTimestamp() > current.getLatestTimestamp()) { + log.info("Updating MDStore " + currentMDStore.getMdstore()); + final String mdstoreDir = createMDStoreDir(hdfsPath, currentMDStore.getMdstore()); + final String rmPath = createMDStoreDir(mdstoreDir, current.getLatestTimestamp().toString()); + try { + synchMDStoreIntoHDFS( + mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, + currentMDStore); + log.info("deleting " + rmPath); + // DELETE THE OLD MDSTORE + fileSystem.delete(new Path(rmPath), true); + } catch (IOException e) { + throw new RuntimeException("Unable to synch and remove path " + rmPath, e); + } + } + } + }; + } + + /** + *This methos store into hdfs all the mongo record of a single mdstore + * + * @param mdFormat the MDStore's format + * @param mdLayout the MDStore'slayout + * @param mdInterpretation the MDStore's interpretation + * @param hdfsPath the basePath into hdfs where all MD-stores are stored + * @param fileSystem The Hadoop File system client + * @param currentMDStore The current Mongo MDStore ID + * @throws IOException + */ + private static void synchMDStoreIntoHDFS(String mdFormat, String mdLayout, String mdInterpretation, String hdfsPath, + FileSystem fileSystem, final String mongoBaseUrl, final String mongoDb, MDStoreInfo currentMDStore) + throws IOException { + // FIRST CREATE the directory basePath/MDSTOREID + final String mdstoreDir = createMDStoreDir(hdfsPath, currentMDStore.getMdstore()); + fileSystem.mkdirs(new Path(mdstoreDir)); + // Then synch all the records into basePath/MDSTOREID/timestamp + final String currentIdDir = createMDStoreDir(mdstoreDir, currentMDStore.getLatestTimestamp().toString()); + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(mongoBaseUrl, mongoDb, + currentIdDir)) { + app.execute(currentMDStore.getCurrentId(), mdFormat, mdLayout, mdInterpretation); + } catch (Exception e) { + throw new RuntimeException( + String + .format("Error on sync mdstore with ID %s into path %s", currentMDStore.getMdstore(), currentIdDir), + e); + } + log.info(String.format("Synchronized mdStore id : %s into path %s", currentMDStore.getMdstore(), currentIdDir)); + } + + public MigrateMongoMdstoresApplication(final String mongoBaseUrl, final String mongoDb, final String hdfsPath) + throws Exception { super(hdfsPath); this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); } - public void execute(final String format, final String layout, final String interpretation) { - final Map colls = mdstoreClient.validCollections(format, layout, interpretation); - log.info("Found {} mdstores", colls.size()); - - for (final Entry entry : colls.entrySet()) { - log.info("Processing mdstore {} (collection: {})", entry.getKey(), entry.getValue()); - final String currentColl = entry.getValue(); - - for (final String xml : mdstoreClient.listRecords(currentColl)) { - emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); - } + public void execute(final String currentColl, final String format, final String layout, + final String interpretation) { + for (final String xml : mdstoreClient.listRecords(currentColl)) { + emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 6f63e9327..b0dc7d7e7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,33 +3,20 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.OafToOafMapper; -import eu.dnetlib.dhp.oa.graph.raw.OdfToOafMapper; -import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; public class AbstractMigrationApplication implements Closeable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json index ee1a6ac4e..b505b7fe0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json @@ -5,6 +5,12 @@ "paramDescription": "the path where storing the sequential file", "paramRequired": true }, + { + "paramName": "n", + "paramLongName": "nameNode", + "paramDescription": "the hdfs Name node url", + "paramRequired": true + }, { "paramName": "mongourl", "paramLongName": "mongoBaseUrl", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 8262c6923..069dd09f7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -301,6 +301,7 @@ --mdFormatODF --mdLayoutstore --mdInterpretationcleaned + --nameNode${nameNode} @@ -326,6 +327,7 @@ --mdFormatOAF --mdLayoutstore --mdInterpretationcleaned + --nameNode${nameNode} @@ -343,6 +345,7 @@ --mdFormatOAF --mdLayoutstore --mdInterpretationintersection + --nameNode${nameNode} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml index 9b68cfb05..c57371560 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml @@ -4,18 +4,6 @@ migrationPathStep1 the base path to store hdfs file - - postgresURL - the postgres URL to access to the database - - - postgresUser - the user postgres - - - postgresPassword - the password postgres - mongoURL mongoDB url, example: mongodb://[username:password@]host[:port] @@ -24,27 +12,6 @@ mongoDb mongo database - - isLookupUrl - the address of the lookUp service - - - nsPrefixBlacklist - - a blacklist of nsprefixes (comma separeted) - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - @@ -58,37 +25,22 @@ - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication - -p${migrationPathStep1}/db_records - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - -islookup${isLookupUrl} - --nsPrefixBlacklist${nsPrefixBlacklist} - - ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationPathStep1}/odf_records + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${migrationPathStep1} -mongourl${mongoURL} -mongodb${mongoDb} -fODF -lstore -icleaned + --nameNode${nameNode} + @@ -98,13 +50,14 @@ ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationPathStep1}/oaf_records + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${migrationPathStep1} -mongourl${mongoURL} -mongodb${mongoDb} -fOAF -lstore -icleaned + --nameNode${nameNode} From 5e4866d03312ba524aa9eb30f1fe337f97aad740 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 12 Dec 2022 11:29:46 +0100 Subject: [PATCH 2/4] implemented synch for single mdstore --- .../dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 57ca9b4c4..0c8903672 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -108,6 +108,9 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio .forEach( consumeMDStore( mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, hdfsMDStores)); + + // TODO: DELETE MDStORE FOLDER NOT PRESENT IN MONGO + } /** From 0b9819f1ab4ad4f28f124e853ad793e601930ded Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 8 Feb 2023 10:32:33 +0100 Subject: [PATCH 3/4] Code formatted --- .../oaf/utils/GraphCleaningFunctions.java | 4 +-- .../ebi/SparkCreateBaselineDataFrame.scala | 3 +- .../dnetlib/dhp/sx/bio/BioScholixTest.scala | 32 ++++++++++++------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index b24daaa5d..fc515b5b1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -13,6 +13,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.collect.Lists; @@ -23,8 +25,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import me.xuender.unidecode.Unidecode; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; public class GraphCleaningFunctions extends CleaningFunctions { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 87116f00a..8ac8b00bf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -27,7 +27,8 @@ object SparkCreateBaselineDataFrame { def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = { val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/") - val result = data.linesWithSeparators.map(l =>l.stripLineEnd) + val result = data.linesWithSeparators + .map(l => l.stripLineEnd) .filter(l => l.startsWith("") diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala index 24caaa553..d1611300d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala @@ -63,7 +63,9 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump")) .mkString - val r: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).toList + val r: List[Oaf] = records.linesWithSeparators + .map(l => l.stripLineEnd) + .toList .map(s => mapper.readValue(s, classOf[PMArticle])) .map(a => PubMedToOaf.convert(a, vocabularies)) assertEquals(10, r.size) @@ -173,9 +175,10 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pdb_dump")) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) - val result: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).toList.flatMap(o => BioDBToOAF.pdbTOOaf(o)) + val result: List[Oaf] = + records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.pdbTOOaf(o)) assertTrue(result.nonEmpty) result.foreach(r => assertNotNull(r)) @@ -194,9 +197,10 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump")) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) - val result: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).toList.flatMap(o => BioDBToOAF.uniprotToOAF(o)) + val result: List[Oaf] = + records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.uniprotToOAF(o)) assertTrue(result.nonEmpty) result.foreach(r => assertNotNull(r)) @@ -239,9 +243,10 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/crossref_links")) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) - val result: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).map(s => BioDBToOAF.crossrefLinksToOaf(s)).toList + val result: List[Oaf] = + records.linesWithSeparators.map(l => l.stripLineEnd).map(s => BioDBToOAF.crossrefLinksToOaf(s)).toList assertNotNull(result) assertTrue(result.nonEmpty) @@ -276,14 +281,17 @@ class BioScholixTest extends AbstractVocabularyTest { getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved") ) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - val l: List[ScholixResolved] = records.linesWithSeparators.map(l =>l.stripLineEnd).map { input => - lazy val json = parse(input) - json.extract[ScholixResolved] - }.toList + val l: List[ScholixResolved] = records.linesWithSeparators + .map(l => l.stripLineEnd) + .map { input => + lazy val json = parse(input) + json.extract[ScholixResolved] + } + .toList val result: List[Oaf] = l.map(s => BioDBToOAF.scholixResolvedToOAF(s)) From 8920932dd8a9428f1875451069154267733e33e5 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 8 Feb 2023 11:34:18 +0100 Subject: [PATCH 4/4] Code formatted --- .../crossref/CrossrefMappingTest.scala | 4 +- .../orcid/MappingORCIDToOAFTest.scala | 8 ++-- .../doiboost/uw/UnpayWallMappingTest.scala | 4 +- .../raw/MigrateMongoMdstoresApplication.java | 4 +- .../dhp/oa/graph/clean/CleanCountryTest.java | 41 +++++++++---------- .../resolution/ResolveEntitiesTest.scala | 8 ++-- .../sx/graph/scholix/ScholixGraphTest.scala | 5 ++- 7 files changed, 39 insertions(+), 35 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala index aba8cee12..12a61454d 100644 --- a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala @@ -31,13 +31,13 @@ class CrossrefMappingTest { .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/funder_doi")) .mkString - for (line <- funder_doi.linesWithSeparators.map(l =>l.stripLineEnd)) { + for (line <- funder_doi.linesWithSeparators.map(l => l.stripLineEnd)) { val json = template.replace("%s", line) val resultList: List[Oaf] = Crossref2Oaf.convert(json) assertTrue(resultList.nonEmpty) checkRelation(resultList) } - for (line <- funder_name.linesWithSeparators.map(l =>l.stripLineEnd)) { + for (line <- funder_name.linesWithSeparators.map(l => l.stripLineEnd)) { val json = template.replace("%s", line) val resultList: List[Oaf] = Crossref2Oaf.convert(json) assertTrue(resultList.nonEmpty) diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala index d7a6a94a5..8033f02fb 100644 --- a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -25,9 +25,11 @@ class MappingORCIDToOAFTest { .mkString assertNotNull(json) assertFalse(json.isEmpty) - json.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => { - assertNotNull(ORCIDToOAF.extractValueFromInputString(s)) - }) + json.linesWithSeparators + .map(l => l.stripLineEnd) + .foreach(s => { + assertNotNull(ORCIDToOAF.extractValueFromInputString(s)) + }) } @Test diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala index 7fe0e9935..30001acb5 100644 --- a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala @@ -22,7 +22,7 @@ class UnpayWallMappingTest { .mkString var i: Int = 0 - for (line <- Ilist.linesWithSeparators.map(l =>l.stripLineEnd)) { + for (line <- Ilist.linesWithSeparators.map(l => l.stripLineEnd)) { val p = UnpayWallToOAF.convertToOAF(line) if (p != null) { @@ -43,7 +43,7 @@ class UnpayWallMappingTest { i = i + 1 } - val l = Ilist.linesWithSeparators.map(l =>l.stripLineEnd).next() + val l = Ilist.linesWithSeparators.map(l => l.stripLineEnd).next() val item = UnpayWallToOAF.convertToOAF(l) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 0c8903672..4949cc627 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -114,7 +114,7 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio } /** - * This method is responsible to synch only the mongoMDStore that changed since last time + * This method is responsible to sync only the stores that have been changed since last time * @param mdFormat the MDStore's format * @param mdLayout the MDStore'slayout * @param mdInterpretation the MDStore's interpretation @@ -164,7 +164,7 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio } /** - *This methos store into hdfs all the mongo record of a single mdstore + *This method store into hdfs all the MONGO record of a single mdstore into the HDFS File * * @param mdFormat the MDStore's format * @param mdLayout the MDStore'slayout diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java index de9e4fc90..3bc69cfd1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.Dataset; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Publication; public class CleanCountryTest { @@ -151,41 +151,40 @@ public class CleanCountryTest { @Test public void testDatasetClean() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") + .getPath(); spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), - Encoders.bean(Dataset.class)) - .write() - .json(workingDir.toString() + "/dataset"); + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), + Encoders.bean(Dataset.class)) + .write() + .json(workingDir.toString() + "/dataset"); CleanCountrySparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/dataset", - "-graphTableClassName", Dataset.class.getCanonicalName(), - "-workingDir", workingDir.toString() + "/working", - "-country", "NL", - "-verifyParam", "10.17632", - "-collectedfrom", "NARCIS", - "-hostedBy", getClass() + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/dataset", + "-graphTableClassName", Dataset.class.getCanonicalName(), + "-workingDir", workingDir.toString() + "/working", + "-country", "NL", + "-verifyParam", "10.17632", + "-collectedfrom", "NARCIS", + "-hostedBy", getClass() .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") .getPath() }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(0, tmp.first().getCountry().size()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala index d415b7fc9..022168de5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala @@ -53,7 +53,8 @@ class ResolveEntitiesTest extends Serializable { def generateUpdates(spark: SparkSession): Unit = { val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString - val pids: List[String] = template.linesWithSeparators.map(l =>l.stripLineEnd) + val pids: List[String] = template.linesWithSeparators + .map(l => l.stripLineEnd) .map { id => val r = new Result r.setId(id.toLowerCase.trim) @@ -127,7 +128,7 @@ class ResolveEntitiesTest extends Serializable { entities.foreach { e => val template = Source.fromInputStream(this.getClass.getResourceAsStream(s"$e")).mkString spark - .createDataset(spark.sparkContext.parallelize(template.linesWithSeparators.map(l =>l.stripLineEnd).toList)) + .createDataset(spark.sparkContext.parallelize(template.linesWithSeparators.map(l => l.stripLineEnd).toList)) .as[String] .write .option("compression", "gzip") @@ -264,7 +265,8 @@ class ResolveEntitiesTest extends Serializable { Source .fromInputStream(this.getClass.getResourceAsStream(s"publication")) .mkString - .linesWithSeparators.map(l =>l.stripLineEnd) + .linesWithSeparators + .map(l => l.stripLineEnd) .next(), classOf[Publication] ) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala index 0ea908290..b838ae065 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala @@ -47,7 +47,7 @@ class ScholixGraphTest extends AbstractVocabularyTest { val inputRelations = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/oaf_to_summary")) .mkString - val items = inputRelations.linesWithSeparators.map(l =>l.stripLineEnd).toList + val items = inputRelations.linesWithSeparators.map(l => l.stripLineEnd).toList assertNotNull(items) items.foreach(i => assertTrue(i.nonEmpty)) val result = @@ -69,7 +69,8 @@ class ScholixGraphTest extends AbstractVocabularyTest { getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/merge_result_scholix") ) .mkString - val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators.map(l =>l.stripLineEnd) + val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators + .map(l => l.stripLineEnd) .sliding(2) .map(s => (s.head, s(1))) .map(p => (mapper.readValue(p._1, classOf[Relation]), mapper.readValue(p._2, classOf[ScholixSummary])))