Merge pull request '8232-mdstore-synch-improve' (#272) from 8232-mdstore-synch-improve into beta

Reviewed-on: #272
This commit is contained in:
Claudio Atzori 2023-02-22 10:02:26 +01:00
commit a590c371a9
8 changed files with 352 additions and 93 deletions

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dhp.common;
/**
* This utility represent the Metadata Store information
* needed during the migration from mongo to HDFS to store
*/
public class MDStoreInfo {
private String mdstore;
private String currentId;
private Long latestTimestamp;
/**
* Instantiates a new Md store info.
*/
public MDStoreInfo() {
}
/**
* Instantiates a new Md store info.
*
* @param mdstore the mdstore
* @param currentId the current id
* @param latestTimestamp the latest timestamp
*/
public MDStoreInfo(String mdstore, String currentId, Long latestTimestamp) {
this.mdstore = mdstore;
this.currentId = currentId;
this.latestTimestamp = latestTimestamp;
}
/**
* Gets mdstore.
*
* @return the mdstore
*/
public String getMdstore() {
return mdstore;
}
/**
* Sets mdstore.
*
* @param mdstore the mdstore
* @return the mdstore
*/
public MDStoreInfo setMdstore(String mdstore) {
this.mdstore = mdstore;
return this;
}
/**
* Gets current id.
*
* @return the current id
*/
public String getCurrentId() {
return currentId;
}
/**
* Sets current id.
*
* @param currentId the current id
* @return the current id
*/
public MDStoreInfo setCurrentId(String currentId) {
this.currentId = currentId;
return this;
}
/**
* Gets latest timestamp.
*
* @return the latest timestamp
*/
public Long getLatestTimestamp() {
return latestTimestamp;
}
/**
* Sets latest timestamp.
*
* @param latestTimestamp the latest timestamp
* @return the latest timestamp
*/
public MDStoreInfo setLatestTimestamp(Long latestTimestamp) {
this.latestTimestamp = latestTimestamp;
return this;
}
@Override
public String toString() {
return "MDStoreInfo{" +
"mdstore='" + mdstore + '\'' +
", currentId='" + currentId + '\'' +
", latestTimestamp=" + latestTimestamp +
'}';
}
}

View File

@ -1,12 +1,12 @@
package eu.dnetlib.dhp.common;
import static com.mongodb.client.model.Sorts.descending;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
@ -38,6 +38,26 @@ public class MdstoreClient implements Closeable {
this.db = getDb(client, dbName);
}
private Long parseTimestamp(Document f) {
if (f == null || !f.containsKey("timestamp"))
return null;
Object ts = f.get("timestamp");
return Long.parseLong(ts.toString());
}
public Long getLatestTimestamp(final String collectionId) {
MongoCollection<Document> collection = db.getCollection(collectionId);
FindIterable<Document> result = collection.find().sort(descending("timestamp")).limit(1);
if (result == null) {
return null;
}
Document f = result.first();
return parseTimestamp(f);
}
public MongoCollection<Document> mdStore(final String mdId) {
BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get();
@ -54,6 +74,16 @@ public class MdstoreClient implements Closeable {
return getColl(db, currentId, true);
}
public List<MDStoreInfo> mdStoreWithTimestamp(final String mdFormat, final String mdLayout,
final String mdInterpretation) {
Map<String, String> res = validCollections(mdFormat, mdLayout, mdInterpretation);
return res
.entrySet()
.stream()
.map(e -> new MDStoreInfo(e.getKey(), e.getValue(), getLatestTimestamp(e.getValue())))
.collect(Collectors.toList());
}
public Map<String, String> validCollections(
final String mdFormat, final String mdLayout, final String mdInterpretation) {

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.common;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MdStoreClientTest {
@Test
public void testMongoCollection() throws IOException {
final MdstoreClient client = new MdstoreClient("mongodb://localhost:27017", "mdstore");
final ObjectMapper mapper = new ObjectMapper();
final List<MDStoreInfo> infos = client.mdStoreWithTimestamp("ODF", "store", "cleaned");
infos.forEach(System.out::println);
final String s = mapper.writeValueAsString(infos);
Path fileName = Paths.get("/Users/sandro/mdstore_info.json");
// Writing into the file
Files.write(fileName, s.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -1,31 +1,87 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.MDStoreInfo;
import eu.dnetlib.dhp.common.MdstoreClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplication.class);
private final MdstoreClient mdstoreClient;
private static List<MDStoreInfo> snapshotsMDStores(final MdstoreClient client,
final String format,
final String layout,
final String interpretation) {
return client.mdStoreWithTimestamp(format, layout, interpretation);
}
private static MDStoreInfo extractPath(final String path, final String basePath) {
int res = path.indexOf(basePath);
if (res > 0) {
String[] split = path.substring(res).split("/");
if (split.length > 2) {
final String ts = split[split.length - 1];
final String mdStore = split[split.length - 2];
return new MDStoreInfo(mdStore, null, Long.parseLong(ts));
}
}
return null;
}
private static Map<String, MDStoreInfo> hdfsMDStoreInfo(FileSystem fs, final String basePath) throws IOException {
final Map<String, MDStoreInfo> hdfs_store = new HashMap<>();
final Path p = new Path(basePath);
final RemoteIterator<LocatedFileStatus> ls = fs.listFiles(p, true);
while (ls.hasNext()) {
String current = ls.next().getPath().toString();
final MDStoreInfo info = extractPath(current, basePath);
if (info != null) {
hdfs_store.put(info.getMdstore(), info);
}
}
return hdfs_store;
}
private static String createMDStoreDir(final String basePath, final String mdStoreId) {
if (basePath.endsWith("/")) {
return basePath + mdStoreId;
} else {
return String.format("%s/%s", basePath, mdStoreId);
}
}
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateMongoMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json")));
Objects
.requireNonNull(
MigrateMongoMdstoresApplication.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json"))));
parser.parseArgument(args);
final String mongoBaseUrl = parser.get("mongoBaseUrl");
@ -36,30 +92,118 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio
final String mdInterpretation = parser.get("mdInterpretation");
final String hdfsPath = parser.get("hdfsPath");
final String nameNode = parser.get("nameNode");
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(nameNode));
final MdstoreClient mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
final List<MDStoreInfo> mongoMDStores = snapshotsMDStores(mdstoreClient, mdFormat, mdLayout, mdInterpretation);
final Map<String, MDStoreInfo> hdfsMDStores = hdfsMDStoreInfo(fileSystem, hdfsPath);
mongoMDStores
.stream()
.filter(currentMDStore -> currentMDStore.getLatestTimestamp() != null)
.forEach(
consumeMDStore(
mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb, hdfsMDStores));
// TODO: DELETE MDStORE FOLDER NOT PRESENT IN MONGO
try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl,
mongoDb)) {
app.execute(mdFormat, mdLayout, mdInterpretation);
}
}
public MigrateMongoMdstoresApplication(
final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception {
/**
* This method is responsible to sync only the stores that have been changed since last time
* @param mdFormat the MDStore's format
* @param mdLayout the MDStore'slayout
* @param mdInterpretation the MDStore's interpretation
* @param hdfsPath the basePath into hdfs where all MD-stores are stored
* @param fileSystem The Hadoop File system client
* @param hdfsMDStores A Map containing as Key the mdstore ID and as value the @{@link MDStoreInfo}
* @return
*/
private static Consumer<MDStoreInfo> consumeMDStore(String mdFormat, String mdLayout, String mdInterpretation,
String hdfsPath, FileSystem fileSystem, final String mongoBaseUrl, final String mongoDb,
Map<String, MDStoreInfo> hdfsMDStores) {
return currentMDStore -> {
// If the key is missing it means that the mdstore is not present in hdfs
// that is the hdfs path basePath/MDSTOREID/timestamp is missing
// So we have to synch it
if (!hdfsMDStores.containsKey(currentMDStore.getMdstore())) {
log.info("Adding store " + currentMDStore.getMdstore());
try {
synchMDStoreIntoHDFS(
mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb,
currentMDStore);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
final MDStoreInfo current = hdfsMDStores.get(currentMDStore.getMdstore());
// IF the key is present it means that in hdfs we have a path
// basePath/MDSTOREID/timestamp but the timestamp on hdfs is older that the
// new one in mongo so we have to synch the new mdstore and delete the old one
if (currentMDStore.getLatestTimestamp() > current.getLatestTimestamp()) {
log.info("Updating MDStore " + currentMDStore.getMdstore());
final String mdstoreDir = createMDStoreDir(hdfsPath, currentMDStore.getMdstore());
final String rmPath = createMDStoreDir(mdstoreDir, current.getLatestTimestamp().toString());
try {
synchMDStoreIntoHDFS(
mdFormat, mdLayout, mdInterpretation, hdfsPath, fileSystem, mongoBaseUrl, mongoDb,
currentMDStore);
log.info("deleting " + rmPath);
// DELETE THE OLD MDSTORE
fileSystem.delete(new Path(rmPath), true);
} catch (IOException e) {
throw new RuntimeException("Unable to synch and remove path " + rmPath, e);
}
}
}
};
}
/**
*This method store into hdfs all the MONGO record of a single mdstore into the HDFS File
*
* @param mdFormat the MDStore's format
* @param mdLayout the MDStore'slayout
* @param mdInterpretation the MDStore's interpretation
* @param hdfsPath the basePath into hdfs where all MD-stores are stored
* @param fileSystem The Hadoop File system client
* @param currentMDStore The current Mongo MDStore ID
* @throws IOException
*/
private static void synchMDStoreIntoHDFS(String mdFormat, String mdLayout, String mdInterpretation, String hdfsPath,
FileSystem fileSystem, final String mongoBaseUrl, final String mongoDb, MDStoreInfo currentMDStore)
throws IOException {
// FIRST CREATE the directory basePath/MDSTOREID
final String mdstoreDir = createMDStoreDir(hdfsPath, currentMDStore.getMdstore());
fileSystem.mkdirs(new Path(mdstoreDir));
// Then synch all the records into basePath/MDSTOREID/timestamp
final String currentIdDir = createMDStoreDir(mdstoreDir, currentMDStore.getLatestTimestamp().toString());
try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(mongoBaseUrl, mongoDb,
currentIdDir)) {
app.execute(currentMDStore.getCurrentId(), mdFormat, mdLayout, mdInterpretation);
} catch (Exception e) {
throw new RuntimeException(
String
.format("Error on sync mdstore with ID %s into path %s", currentMDStore.getMdstore(), currentIdDir),
e);
}
log.info(String.format("Synchronized mdStore id : %s into path %s", currentMDStore.getMdstore(), currentIdDir));
}
public MigrateMongoMdstoresApplication(final String mongoBaseUrl, final String mongoDb, final String hdfsPath)
throws Exception {
super(hdfsPath);
this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
}
public void execute(final String format, final String layout, final String interpretation) {
final Map<String, String> colls = mdstoreClient.validCollections(format, layout, interpretation);
log.info("Found {} mdstores", colls.size());
for (final Entry<String, String> entry : colls.entrySet()) {
log.info("Processing mdstore {} (collection: {})", entry.getKey(), entry.getValue());
final String currentColl = entry.getValue();
for (final String xml : mdstoreClient.listRecords(currentColl)) {
emit(xml, String.format("%s-%s-%s", format, layout, interpretation));
}
public void execute(final String currentColl, final String format, final String layout,
final String interpretation) {
for (final String xml : mdstoreClient.listRecords(currentColl)) {
emit(xml, String.format("%s-%s-%s", format, layout, interpretation));
}
}

View File

@ -3,33 +3,20 @@ package eu.dnetlib.dhp.oa.graph.raw.common;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.OafToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.OdfToOafMapper;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.utils.DHPUtils;
public class AbstractMigrationApplication implements Closeable {

View File

@ -5,6 +5,12 @@
"paramDescription": "the path where storing the sequential file",
"paramRequired": true
},
{
"paramName": "n",
"paramLongName": "nameNode",
"paramDescription": "the hdfs Name node url",
"paramRequired": true
},
{
"paramName": "mongourl",
"paramLongName": "mongoBaseUrl",

View File

@ -301,6 +301,7 @@
<arg>--mdFormat</arg><arg>ODF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="reuse_oaf"/>
<error to="Kill"/>
@ -326,6 +327,7 @@
<arg>--mdFormat</arg><arg>OAF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="ImportOAF_invisible"/>
<error to="Kill"/>
@ -343,6 +345,7 @@
<arg>--mdFormat</arg><arg>OAF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>intersection</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="ImportODF_hdfs"/>
<error to="Kill"/>

View File

@ -4,18 +4,6 @@
<name>migrationPathStep1</name>
<description>the base path to store hdfs file</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
@ -24,27 +12,6 @@
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>nsPrefixBlacklist</name>
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
@ -58,37 +25,22 @@
<delete path='${migrationPathStep1}'/>
<mkdir path='${migrationPathStep1}'/>
</fs>
<ok to="ImportDB"/>
<error to="Kill"/>
</action>
<action name="ImportDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
<arg>-islookup</arg><arg>${isLookupUrl}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>
</action>
<action name="ImportODF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="ImportOAF"/>
<error to="Kill"/>
@ -98,13 +50,14 @@
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${migrationPathStep1}</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>