diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java new file mode 100644 index 000000000..bd1ccca50 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MDStoreInfo.java @@ -0,0 +1,100 @@ + +package eu.dnetlib.dhp.common; + +/** + * This utility represent the Metadata Store information + * needed during the migration from mongo to HDFS to store + */ +public class MDStoreInfo { + private String mdstore; + private String currentId; + private Long latestTimestamp; + + /** + * Instantiates a new Md store info. + */ + public MDStoreInfo() { + } + + /** + * Instantiates a new Md store info. + * + * @param mdstore the mdstore + * @param currentId the current id + * @param latestTimestamp the latest timestamp + */ + public MDStoreInfo(String mdstore, String currentId, Long latestTimestamp) { + this.mdstore = mdstore; + this.currentId = currentId; + this.latestTimestamp = latestTimestamp; + } + + /** + * Gets mdstore. + * + * @return the mdstore + */ + public String getMdstore() { + return mdstore; + } + + /** + * Sets mdstore. + * + * @param mdstore the mdstore + * @return the mdstore + */ + public MDStoreInfo setMdstore(String mdstore) { + this.mdstore = mdstore; + return this; + } + + /** + * Gets current id. + * + * @return the current id + */ + public String getCurrentId() { + return currentId; + } + + /** + * Sets current id. + * + * @param currentId the current id + * @return the current id + */ + public MDStoreInfo setCurrentId(String currentId) { + this.currentId = currentId; + return this; + } + + /** + * Gets latest timestamp. + * + * @return the latest timestamp + */ + public Long getLatestTimestamp() { + return latestTimestamp; + } + + /** + * Sets latest timestamp. + * + * @param latestTimestamp the latest timestamp + * @return the latest timestamp + */ + public MDStoreInfo setLatestTimestamp(Long latestTimestamp) { + this.latestTimestamp = latestTimestamp; + return this; + } + + @Override + public String toString() { + return "MDStoreInfo{" + + "mdstore='" + mdstore + '\'' + + ", currentId='" + currentId + '\'' + + ", latestTimestamp=" + latestTimestamp + + '}'; + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java index d06544ae1..34aa37be5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java @@ -1,12 +1,12 @@ package eu.dnetlib.dhp.common; +import static com.mongodb.client.model.Sorts.descending; + import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.commons.lang3.StringUtils; @@ -38,6 +38,26 @@ public class MdstoreClient implements Closeable { this.db = getDb(client, dbName); } + private Long parseTimestamp(Document f) { + if (f == null || !f.containsKey("timestamp")) + return null; + + Object ts = f.get("timestamp"); + + return Long.parseLong(ts.toString()); + } + + public Long getLatestTimestamp(final String collectionId) { + MongoCollection collection = db.getCollection(collectionId); + FindIterable result = collection.find().sort(descending("timestamp")).limit(1); + if (result == null) { + return null; + } + + Document f = result.first(); + return parseTimestamp(f); + } + public MongoCollection mdStore(final String mdId) { BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get(); @@ -54,6 +74,16 @@ public class MdstoreClient implements Closeable { return getColl(db, currentId, true); } + public List mdStoreWithTimestamp(final String mdFormat, final String mdLayout, + final String mdInterpretation) { + Map res = validCollections(mdFormat, mdLayout, mdInterpretation); + return res + .entrySet() + .stream() + .map(e -> new MDStoreInfo(e.getKey(), e.getValue(), getLatestTimestamp(e.getValue()))) + .collect(Collectors.toList()); + } + public Map validCollections( final String mdFormat, final String mdLayout, final String mdInterpretation) { diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java new file mode 100644 index 000000000..f38d04979 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/MdStoreClientTest.java @@ -0,0 +1,36 @@ + +package eu.dnetlib.dhp.common; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class MdStoreClientTest { + + @Test + public void testMongoCollection() throws IOException { + final MdstoreClient client = new MdstoreClient("mongodb://localhost:27017", "mdstore"); + + final ObjectMapper mapper = new ObjectMapper(); + + final List infos = client.mdStoreWithTimestamp("ODF", "store", "cleaned"); + + infos.forEach(System.out::println); + + final String s = mapper.writeValueAsString(infos); + + Path fileName = Paths.get("/Users/sandro/mdstore_info.json"); + + // Writing into the file + Files.write(fileName, s.getBytes(StandardCharsets.UTF_8)); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 6dbab96cb..4949cc627 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -1,31 +1,87 @@ package eu.dnetlib.dhp.oa.graph.raw; +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.Objects; +import java.util.function.Consumer; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.MDStoreInfo; import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplication.class); - private final MdstoreClient mdstoreClient; + private static List snapshotsMDStores(final MdstoreClient client, + final String format, + final String layout, + final String interpretation) { + return client.mdStoreWithTimestamp(format, layout, interpretation); + } + + private static MDStoreInfo extractPath(final String path, final String basePath) { + int res = path.indexOf(basePath); + if (res > 0) { + String[] split = path.substring(res).split("/"); + if (split.length > 2) { + final String ts = split[split.length - 1]; + final String mdStore = split[split.length - 2]; + return new MDStoreInfo(mdStore, null, Long.parseLong(ts)); + } + } + return null; + } + + private static Map hdfsMDStoreInfo(FileSystem fs, final String basePath) throws IOException { + final Map hdfs_store = new HashMap<>(); + final Path p = new Path(basePath); + final RemoteIterator ls = fs.listFiles(p, true); + while (ls.hasNext()) { + + String current = ls.next().getPath().toString(); + + final MDStoreInfo info = extractPath(current, basePath); + if (info != null) { + hdfs_store.put(info.getMdstore(), info); + } + } + return hdfs_store; + } + + private static String createMDStoreDir(final String basePath, final String mdStoreId) { + if (basePath.endsWith("/")) { + return basePath + mdStoreId; + } else { + return String.format("%s/%s", basePath, mdStoreId); + } + } + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - MigrateMongoMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json"))); + Objects + .requireNonNull( + MigrateMongoMdstoresApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json")))); parser.parseArgument(args); final String mongoBaseUrl = parser.get("mongoBaseUrl"); @@ -36,30 +92,118 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio final String mdInterpretation = parser.get("mdInterpretation"); final String hdfsPath = parser.get("hdfsPath"); + final String nameNode = parser.get("nameNode"); + + final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(nameNode)); + + final MdstoreClient mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); + + final List mongoMDStores = snapshotsMDStores(mdstoreClient, mdFormat, mdLayout, mdInterpretation); + + final Map hdfsMDStores = hdfsMDStoreInfo(fileSystem, hdfsPath); + + mongoMDStores + .stream() + .filter(currentMDStore -> currentMDStore.getLatestTimestamp() != null) + .forEach( + consumeMDStore( + mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, hdfsMDStores)); + + // TODO: DELETE MDStORE FOLDER NOT PRESENT IN MONGO - try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl, - mongoDb)) { - app.execute(mdFormat, mdLayout, mdInterpretation); - } } - public MigrateMongoMdstoresApplication( - final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception { + /** + * This method is responsible to sync only the stores that have been changed since last time + * @param mdFormat the MDStore's format + * @param mdLayout the MDStore'slayout + * @param mdInterpretation the MDStore's interpretation + * @param hdfsPath the basePath into hdfs where all MD-stores are stored + * @param fileSystem The Hadoop File system client + * @param hdfsMDStores A Map containing as Key the mdstore ID and as value the @{@link MDStoreInfo} + * @return + */ + private static Consumer consumeMDStore(String mdFormat, String mdLayout, String mdInterpretation, + String hdfsPath, FileSystem fileSystem, final String mongoBaseUrl, final String mongoDb, + Map hdfsMDStores) { + return currentMDStore -> { + // If the key is missing it means that the mdstore is not present in hdfs + // that is the hdfs path basePath/MDSTOREID/timestamp is missing + // So we have to synch it + if (!hdfsMDStores.containsKey(currentMDStore.getMdstore())) { + log.info("Adding store " + currentMDStore.getMdstore()); + try { + synchMDStoreIntoHDFS( + mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, + currentMDStore); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + final MDStoreInfo current = hdfsMDStores.get(currentMDStore.getMdstore()); + // IF the key is present it means that in hdfs we have a path + // basePath/MDSTOREID/timestamp but the timestamp on hdfs is older that the + // new one in mongo so we have to synch the new mdstore and delete the old one + if (currentMDStore.getLatestTimestamp() > current.getLatestTimestamp()) { + log.info("Updating MDStore " + currentMDStore.getMdstore()); + final String mdstoreDir = createMDStoreDir(hdfsPath, currentMDStore.getMdstore()); + final String rmPath = createMDStoreDir(mdstoreDir, current.getLatestTimestamp().toString()); + try { + synchMDStoreIntoHDFS( + mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, + currentMDStore); + log.info("deleting " + rmPath); + // DELETE THE OLD MDSTORE + fileSystem.delete(new Path(rmPath), true); + } catch (IOException e) { + throw new RuntimeException("Unable to synch and remove path " + rmPath, e); + } + } + } + }; + } + + /** + *This method store into hdfs all the MONGO record of a single mdstore into the HDFS File + * + * @param mdFormat the MDStore's format + * @param mdLayout the MDStore'slayout + * @param mdInterpretation the MDStore's interpretation + * @param hdfsPath the basePath into hdfs where all MD-stores are stored + * @param fileSystem The Hadoop File system client + * @param currentMDStore The current Mongo MDStore ID + * @throws IOException + */ + private static void synchMDStoreIntoHDFS(String mdFormat, String mdLayout, String mdInterpretation, String hdfsPath, + FileSystem fileSystem, final String mongoBaseUrl, final String mongoDb, MDStoreInfo currentMDStore) + throws IOException { + // FIRST CREATE the directory basePath/MDSTOREID + final String mdstoreDir = createMDStoreDir(hdfsPath, currentMDStore.getMdstore()); + fileSystem.mkdirs(new Path(mdstoreDir)); + // Then synch all the records into basePath/MDSTOREID/timestamp + final String currentIdDir = createMDStoreDir(mdstoreDir, currentMDStore.getLatestTimestamp().toString()); + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(mongoBaseUrl, mongoDb, + currentIdDir)) { + app.execute(currentMDStore.getCurrentId(), mdFormat, mdLayout, mdInterpretation); + } catch (Exception e) { + throw new RuntimeException( + String + .format("Error on sync mdstore with ID %s into path %s", currentMDStore.getMdstore(), currentIdDir), + e); + } + log.info(String.format("Synchronized mdStore id : %s into path %s", currentMDStore.getMdstore(), currentIdDir)); + } + + public MigrateMongoMdstoresApplication(final String mongoBaseUrl, final String mongoDb, final String hdfsPath) + throws Exception { super(hdfsPath); this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); } - public void execute(final String format, final String layout, final String interpretation) { - final Map colls = mdstoreClient.validCollections(format, layout, interpretation); - log.info("Found {} mdstores", colls.size()); - - for (final Entry entry : colls.entrySet()) { - log.info("Processing mdstore {} (collection: {})", entry.getKey(), entry.getValue()); - final String currentColl = entry.getValue(); - - for (final String xml : mdstoreClient.listRecords(currentColl)) { - emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); - } + public void execute(final String currentColl, final String format, final String layout, + final String interpretation) { + for (final String xml : mdstoreClient.listRecords(currentColl)) { + emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 6f63e9327..b0dc7d7e7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,33 +3,20 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.OafToOafMapper; -import eu.dnetlib.dhp.oa.graph.raw.OdfToOafMapper; -import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.utils.DHPUtils; public class AbstractMigrationApplication implements Closeable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json index ee1a6ac4e..b505b7fe0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json @@ -5,6 +5,12 @@ "paramDescription": "the path where storing the sequential file", "paramRequired": true }, + { + "paramName": "n", + "paramLongName": "nameNode", + "paramDescription": "the hdfs Name node url", + "paramRequired": true + }, { "paramName": "mongourl", "paramLongName": "mongoBaseUrl", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 8262c6923..069dd09f7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -301,6 +301,7 @@ --mdFormatODF --mdLayoutstore --mdInterpretationcleaned + --nameNode${nameNode} @@ -326,6 +327,7 @@ --mdFormatOAF --mdLayoutstore --mdInterpretationcleaned + --nameNode${nameNode} @@ -343,6 +345,7 @@ --mdFormatOAF --mdLayoutstore --mdInterpretationintersection + --nameNode${nameNode} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml index 9b68cfb05..c57371560 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml @@ -4,18 +4,6 @@ migrationPathStep1 the base path to store hdfs file - - postgresURL - the postgres URL to access to the database - - - postgresUser - the user postgres - - - postgresPassword - the password postgres - mongoURL mongoDB url, example: mongodb://[username:password@]host[:port] @@ -24,27 +12,6 @@ mongoDb mongo database - - isLookupUrl - the address of the lookUp service - - - nsPrefixBlacklist - - a blacklist of nsprefixes (comma separeted) - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - @@ -58,37 +25,22 @@ - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication - -p${migrationPathStep1}/db_records - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - -islookup${isLookupUrl} - --nsPrefixBlacklist${nsPrefixBlacklist} - - ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationPathStep1}/odf_records + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${migrationPathStep1} -mongourl${mongoURL} -mongodb${mongoDb} -fODF -lstore -icleaned + --nameNode${nameNode} + @@ -98,13 +50,14 @@ ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationPathStep1}/oaf_records + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${migrationPathStep1} -mongourl${mongoURL} -mongodb${mongoDb} -fOAF -lstore -icleaned + --nameNode${nameNode}