forked from antonis.lempesis/dnet-hadoop
migration using spark in step2
This commit is contained in:
parent
5445a57102
commit
4b29a121b0
|
@ -1,4 +1,14 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.step1;
|
||||
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.asString;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listKeyValues;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -17,6 +27,8 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication;
|
||||
import eu.dnetlib.dhp.migration.utils.DbClient;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
|
@ -34,7 +46,7 @@ import eu.dnetlib.dhp.schema.oaf.Result;
|
|||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable {
|
||||
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable {
|
||||
|
||||
private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION =
|
||||
qualifier("sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry", "dnet:provenance_actions", "dnet:provenance_actions");
|
||||
|
|
|
@ -1,10 +1,23 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.step1;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication;
|
||||
import eu.dnetlib.dhp.migration.utils.MdstoreClient;
|
||||
|
||||
public class MigrateMongoMdstoresApplication {
|
||||
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {
|
||||
|
||||
private static final Log log = LogFactory.getLog(MigrateMongoMdstoresApplication.class);
|
||||
|
||||
private final MdstoreClient mdstoreClient;
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
|
@ -22,24 +35,36 @@ public class MigrateMongoMdstoresApplication {
|
|||
final String hdfsNameNode = parser.get("namenode");
|
||||
final String hdfsUser = parser.get("hdfsUser");
|
||||
|
||||
final String dbUrl = parser.get("postgresUrl");
|
||||
final String dbUser = parser.get("postgresUser");
|
||||
final String dbPassword = parser.get("postgresPassword");
|
||||
|
||||
if (mdFormat.equalsIgnoreCase("oaf")) {
|
||||
try (final OafMigrationExecutor mig =
|
||||
new OafMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) {
|
||||
mig.processMdRecords(mdFormat, mdLayout, mdInterpretation);
|
||||
}
|
||||
} else if (mdFormat.equalsIgnoreCase("odf")) {
|
||||
try (final OdfMigrationExecutor mig =
|
||||
new OdfMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) {
|
||||
mig.processMdRecords(mdFormat, mdLayout, mdInterpretation);
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("Format not supported: " + mdFormat);
|
||||
try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb)) {
|
||||
app.execute(mdFormat, mdLayout, mdInterpretation);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public MigrateMongoMdstoresApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl,
|
||||
final String mongoDb) throws Exception {
|
||||
super(hdfsPath, hdfsNameNode, hdfsUser);
|
||||
this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
|
||||
}
|
||||
|
||||
public void execute(final String format, final String layout, final String interpretation) {
|
||||
final Map<String, String> colls = mdstoreClient.validCollections(format, layout, interpretation);
|
||||
log.info("Found " + colls.size() + " mdstores");
|
||||
|
||||
for (final Entry<String, String> entry : colls.entrySet()) {
|
||||
log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")");
|
||||
final String currentColl = entry.getValue();
|
||||
|
||||
for (final String xml : mdstoreClient.listRecords(currentColl)) {
|
||||
emit(xml, "native_" + format);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
mdstoreClient.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,20 +1,24 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.step2;
|
||||
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.keyValue;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.oaiIProvenance;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.DocumentFactory;
|
||||
import org.dom4j.DocumentHelper;
|
||||
import org.dom4j.Node;
|
||||
|
@ -37,11 +41,9 @@ import eu.dnetlib.dhp.schema.oaf.Result;
|
|||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor {
|
||||
public abstract class AbstractMdRecordToOafMapper {
|
||||
|
||||
protected final Map<String, String> code2name = new HashMap<>();
|
||||
|
||||
protected final MdstoreClient mdstoreClient;
|
||||
protected final Map<String, String> code2name;
|
||||
|
||||
protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
|
||||
|
||||
|
@ -51,79 +53,36 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor {
|
|||
protected static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER = qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies");
|
||||
protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER = qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies");
|
||||
|
||||
private static final Log log = LogFactory.getLog(AbstractMongoExecutor.class);
|
||||
|
||||
public AbstractMongoExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl,
|
||||
final String mongoDb, final String dbUrl, final String dbUser,
|
||||
final String dbPassword) throws Exception {
|
||||
|
||||
super(hdfsPath, hdfsNameNode, hdfsUser);
|
||||
|
||||
this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
|
||||
loadClassNames(dbUrl, dbUser, dbPassword);
|
||||
|
||||
final Map<String, String> nsContext = new HashMap<>();
|
||||
|
||||
registerNamespaces(nsContext);
|
||||
|
||||
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
|
||||
protected AbstractMdRecordToOafMapper(final Map<String, String> code2name) {
|
||||
this.code2name = code2name;
|
||||
}
|
||||
|
||||
private void loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException {
|
||||
public List<Oaf> processMdRecord(final String xml) {
|
||||
try {
|
||||
final Map<String, String> nsContext = new HashMap<>();
|
||||
nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
|
||||
nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
|
||||
nsContext.put("oaf", "http://namespace.openaire.eu/oaf");
|
||||
nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/");
|
||||
nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance");
|
||||
nsContext.put("dc", "http://purl.org/dc/elements/1.1/");
|
||||
nsContext.put("datacite", "http://datacite.org/schema/kernel-3");
|
||||
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
|
||||
|
||||
log.info("Loading vocabulary terms from db...");
|
||||
final Document doc = DocumentHelper.parseText(xml);
|
||||
|
||||
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
|
||||
code2name.clear();
|
||||
dbClient.processResults("select code, name from class", rs -> {
|
||||
try {
|
||||
code2name.put(rs.getString("code"), rs.getString("name"));
|
||||
} catch (final SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
final String type = doc.valueOf("//dr:CobjCategory/@type");
|
||||
final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name"));
|
||||
final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom
|
||||
: keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name"));
|
||||
|
||||
final DataInfo info = prepareDataInfo(doc);
|
||||
final long lastUpdateTimestamp = new Date().getTime();
|
||||
|
||||
return createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp);
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
log.info("Found " + code2name.size() + " terms.");
|
||||
|
||||
}
|
||||
|
||||
public void processMdRecords(final String mdFormat, final String mdLayout, final String mdInterpretation) throws DocumentException {
|
||||
|
||||
log.info(String.format("Searching mdstores (format: %s, layout: %s, interpretation: %s)", mdFormat, mdLayout, mdInterpretation));
|
||||
|
||||
final Map<String, String> colls = mdstoreClient.validCollections(mdFormat, mdLayout, mdInterpretation);
|
||||
log.info("Found " + colls.size() + " mdstores");
|
||||
|
||||
for (final Entry<String, String> entry : colls.entrySet()) {
|
||||
log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")");
|
||||
final String currentColl = entry.getValue();
|
||||
|
||||
for (final String xml : mdstoreClient.listRecords(currentColl)) {
|
||||
final Document doc = DocumentHelper.parseText(xml);
|
||||
|
||||
final String type = doc.valueOf("//dr:CobjCategory/@type");
|
||||
final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name"));
|
||||
final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom
|
||||
: keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name"));
|
||||
|
||||
final DataInfo info = prepareDataInfo(doc);
|
||||
final long lastUpdateTimestamp = new Date().getTime();
|
||||
|
||||
for (final Oaf oaf : createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp)) {
|
||||
emitOaf(oaf);
|
||||
}
|
||||
}
|
||||
}
|
||||
log.info("All Done.");
|
||||
}
|
||||
|
||||
protected void registerNamespaces(final Map<String, String> nsContext) {
|
||||
nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
|
||||
nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
|
||||
nsContext.put("oaf", "http://namespace.openaire.eu/oaf");
|
||||
nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/");
|
||||
nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance");
|
||||
}
|
||||
|
||||
protected List<Oaf> createOafs(final Document doc,
|
||||
|
@ -432,10 +391,4 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor {
|
|||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
mdstoreClient.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
package eu.dnetlib.dhp.migration.step2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication;
|
||||
import eu.dnetlib.dhp.migration.utils.DbClient;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class GenerateEntitiesApplication {
|
||||
|
||||
private static final Log log = LogFactory.getLog(GenerateEntitiesApplication.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(MigrateMongoMdstoresApplication.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/migration/generate_entities_parameters.json")));
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
final String sourcePaths = parser.get("sourcePaths");
|
||||
final String targetPath = parser.get("targetPath");
|
||||
|
||||
final String dbUrl = parser.get("postgresUrl");
|
||||
final String dbUser = parser.get("postgresUser");
|
||||
final String dbPassword = parser.get("postgresPassword");
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(GenerateEntitiesApplication.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword);
|
||||
|
||||
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
||||
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
|
||||
generateEntities(sc, code2name, existingSourcePaths, targetPath);
|
||||
}
|
||||
}
|
||||
|
||||
private static void generateEntities(final JavaSparkContext sc,
|
||||
final Map<String, String> code2name,
|
||||
final List<String> sourcePaths,
|
||||
final String targetPath) {
|
||||
|
||||
log.info("Generate entities from files:");
|
||||
sourcePaths.forEach(log::info);
|
||||
|
||||
JavaRDD<String> inputRdd = sc.emptyRDD();
|
||||
|
||||
for (final String sp : sourcePaths) {
|
||||
inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class)
|
||||
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
|
||||
.map(k -> convertToListOaf(k._1(), k._2(), code2name))
|
||||
.flatMap(list -> list.iterator())
|
||||
.map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf)));
|
||||
}
|
||||
|
||||
inputRdd.saveAsTextFile(targetPath);
|
||||
|
||||
}
|
||||
|
||||
private static List<Oaf> convertToListOaf(final String id, final String s, final Map<String, String> code2name) {
|
||||
final String type = StringUtils.substringAfter(id, ":");
|
||||
|
||||
switch (type.toLowerCase()) {
|
||||
case "native_oaf":
|
||||
return new OafToOafMapper(code2name).processMdRecord(s);
|
||||
case "native_odf":
|
||||
return new OafToOafMapper(code2name).processMdRecord(s);
|
||||
case "datasource":
|
||||
return Arrays.asList(convertFromJson(s, Datasource.class));
|
||||
case "organization":
|
||||
return Arrays.asList(convertFromJson(s, Organization.class));
|
||||
case "project":
|
||||
return Arrays.asList(convertFromJson(s, Project.class));
|
||||
case "relation":
|
||||
return Arrays.asList(convertFromJson(s, Relation.class));
|
||||
case "publication":
|
||||
return Arrays.asList(convertFromJson(s, Publication.class));
|
||||
case "dataset":
|
||||
return Arrays.asList(convertFromJson(s, Dataset.class));
|
||||
case "software":
|
||||
return Arrays.asList(convertFromJson(s, Software.class));
|
||||
case "otherresearchproducts":
|
||||
default:
|
||||
return Arrays.asList(convertFromJson(s, OtherResearchProduct.class));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static Map<String, String> loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException {
|
||||
|
||||
log.info("Loading vocabulary terms from db...");
|
||||
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
|
||||
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
|
||||
dbClient.processResults("select code, name from class", rs -> {
|
||||
try {
|
||||
map.put(rs.getString("code"), rs.getString("name"));
|
||||
} catch (final SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.info("Found " + map.size() + " terms.");
|
||||
|
||||
return map;
|
||||
|
||||
}
|
||||
|
||||
private static String convertToJson(final Oaf oaf) {
|
||||
try {
|
||||
return new ObjectMapper().writeValueAsString(oaf);
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) {
|
||||
try {
|
||||
return new ObjectMapper().readValue(s, clazz);
|
||||
} catch (final Exception e) {
|
||||
log.error("Error parsing object of class: " + clazz);
|
||||
log.error(s);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
|
||||
try {
|
||||
final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
|
||||
final Path path = new Path(pathToFile);
|
||||
return hdfs.exists(path);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,16 +1,17 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.step2;
|
||||
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.Node;
|
||||
|
||||
import eu.dnetlib.dhp.migration.pace.PacePerson;
|
||||
import eu.dnetlib.dhp.migration.utils.PacePerson;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
|
@ -22,20 +23,10 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class OafMigrationExecutor extends AbstractMongoExecutor {
|
||||
public class OafToOafMapper extends AbstractMdRecordToOafMapper {
|
||||
|
||||
private static final Log log = LogFactory.getLog(OafMigrationExecutor.class);
|
||||
|
||||
public OafMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb,
|
||||
final String dbUrl, final String dbUser,
|
||||
final String dbPassword) throws Exception {
|
||||
super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void registerNamespaces(final Map<String, String> nsContext) {
|
||||
super.registerNamespaces(nsContext);
|
||||
nsContext.put("dc", "http://purl.org/dc/elements/1.1/");
|
||||
public OafToOafMapper(final Map<String, String> code2name) {
|
||||
super(code2name);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.step2;
|
||||
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -6,8 +10,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.Node;
|
||||
|
||||
|
@ -22,38 +24,28 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
||||
public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
|
||||
|
||||
private static final Log log = LogFactory.getLog(OdfMigrationExecutor.class);
|
||||
|
||||
public OdfMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb,
|
||||
final String dbUrl, final String dbUser,
|
||||
final String dbPassword) throws Exception {
|
||||
super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void registerNamespaces(final Map<String, String> nsContext) {
|
||||
super.registerNamespaces(nsContext);
|
||||
nsContext.put("dc", "http://datacite.org/schema/kernel-3");
|
||||
public OdfToOafMapper(final Map<String, String> code2name) {
|
||||
super(code2name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<StructuredProperty> prepareTitles(final Document doc, final DataInfo info) {
|
||||
return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info);
|
||||
return prepareListStructProps(doc, "//datacite:title", MAIN_TITLE_QUALIFIER, info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Author> prepareAuthors(final Document doc, final DataInfo info) {
|
||||
final List<Author> res = new ArrayList<>();
|
||||
int pos = 1;
|
||||
for (final Object o : doc.selectNodes("//dc:creator")) {
|
||||
for (final Object o : doc.selectNodes("//datacite:creator")) {
|
||||
final Node n = (Node) o;
|
||||
final Author author = new Author();
|
||||
author.setFullname(n.valueOf("./dc:creatorName"));
|
||||
author.setName(n.valueOf("./dc:givenName"));
|
||||
author.setSurname(n.valueOf("./dc:familyName"));
|
||||
author.setAffiliation(prepareListFields(doc, "./dc:affiliation", info));
|
||||
author.setFullname(n.valueOf("./datacite:creatorName"));
|
||||
author.setName(n.valueOf("./datacite:givenName"));
|
||||
author.setSurname(n.valueOf("./datacite:familyName"));
|
||||
author.setAffiliation(prepareListFields(doc, "./datacite:affiliation", info));
|
||||
author.setPid(preparePids(doc, info));
|
||||
author.setRank(pos++);
|
||||
res.add(author);
|
||||
|
@ -63,7 +55,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
|
||||
private List<StructuredProperty> preparePids(final Document doc, final DataInfo info) {
|
||||
final List<StructuredProperty> res = new ArrayList<>();
|
||||
for (final Object o : doc.selectNodes("./dc:nameIdentifier")) {
|
||||
for (final Object o : doc.selectNodes("./datacite:nameIdentifier")) {
|
||||
res.add(structuredProperty(((Node) o).getText(), prepareQualifier((Node) o, "./@nameIdentifierScheme", "dnet:pid_types", "dnet:pid_types"), info));
|
||||
}
|
||||
return res;
|
||||
|
@ -72,7 +64,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
@Override
|
||||
protected List<Instance> prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) {
|
||||
final List<Instance> res = new ArrayList<>();
|
||||
for (final Object o : doc.selectNodes("//dc:alternateIdentifier[@alternateIdentifierType='URL']")) {
|
||||
for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) {
|
||||
final Instance instance = new Instance();
|
||||
instance.setUrl(Arrays.asList(((Node) o).getText().trim()));
|
||||
instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource"));
|
||||
|
@ -98,7 +90,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
@Override
|
||||
protected List<StructuredProperty> prepareRelevantDates(final Document doc, final DataInfo info) {
|
||||
final List<StructuredProperty> res = new ArrayList<>();
|
||||
for (final Object o : doc.selectNodes("//dc:date")) {
|
||||
for (final Object o : doc.selectNodes("//datacite:date")) {
|
||||
final String dateType = ((Node) o).valueOf("@dateType");
|
||||
if (StringUtils.isBlank(dateType) && !dateType.equalsIgnoreCase("Accepted") && !dateType.equalsIgnoreCase("Issued")
|
||||
&& !dateType.equalsIgnoreCase("Updated") && !dateType.equalsIgnoreCase("Available")) {
|
||||
|
@ -115,32 +107,32 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
|
||||
@Override
|
||||
protected List<Field<String>> prepareContributors(final Document doc, final DataInfo info) {
|
||||
return prepareListFields(doc, "//dc:contributorName", info);
|
||||
return prepareListFields(doc, "//datacite:contributorName", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Field<String>> prepareFormats(final Document doc, final DataInfo info) {
|
||||
return prepareListFields(doc, "//dc:format", info);
|
||||
return prepareListFields(doc, "//datacite:format", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<String> preparePublisher(final Document doc, final DataInfo info) {
|
||||
return prepareField(doc, "//dc:publisher", info);
|
||||
return prepareField(doc, "//datacite:publisher", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Field<String>> prepareDescriptions(final Document doc, final DataInfo info) {
|
||||
return prepareListFields(doc, "//dc:description[@descriptionType='Abstract']", info);
|
||||
return prepareListFields(doc, "//datacite:description[@descriptionType='Abstract']", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<StructuredProperty> prepareSubjects(final Document doc, final DataInfo info) {
|
||||
return prepareListStructProps(doc, "//dc:subject", info);
|
||||
return prepareListStructProps(doc, "//datacite:subject", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Qualifier prepareLanguages(final Document doc) {
|
||||
return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages");
|
||||
return prepareQualifier(doc, "//datacite:language", "dnet:languages", "dnet:languages");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -150,17 +142,17 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
|
||||
@Override
|
||||
protected List<Field<String>> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) {
|
||||
return prepareListFields(doc, "//dc:contributor[@contributorType='ContactGroup']/dc:contributorName", info);
|
||||
return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Field<String>> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) {
|
||||
return prepareListFields(doc, "//dc:contributor[@contributorType='ContactPerson']/dc:contributorName", info);
|
||||
return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) {
|
||||
return prepareQualifier(doc, "//dc:format", "dnet:programming_languages", "dnet:programming_languages");
|
||||
return prepareQualifier(doc, "//datacite:format", "dnet:programming_languages", "dnet:programming_languages");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -175,7 +167,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
|
||||
@Override
|
||||
protected List<Field<String>> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) {
|
||||
return prepareListFields(doc, "//dc:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info);
|
||||
return prepareListFields(doc, "//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info);
|
||||
}
|
||||
|
||||
// DATASETS
|
||||
|
@ -184,11 +176,11 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
protected List<GeoLocation> prepareDatasetGeoLocations(final Document doc, final DataInfo info) {
|
||||
final List<GeoLocation> res = new ArrayList<>();
|
||||
|
||||
for (final Object o : doc.selectNodes("//dc:geoLocation")) {
|
||||
for (final Object o : doc.selectNodes("//datacite:geoLocation")) {
|
||||
final GeoLocation loc = new GeoLocation();
|
||||
loc.setBox(((Node) o).valueOf("./dc:geoLocationBox"));
|
||||
loc.setPlace(((Node) o).valueOf("./dc:geoLocationPlace"));
|
||||
loc.setPoint(((Node) o).valueOf("./dc:geoLocationPoint"));
|
||||
loc.setBox(((Node) o).valueOf("./datacite:geoLocationBox"));
|
||||
loc.setPlace(((Node) o).valueOf("./datacite:geoLocationPlace"));
|
||||
loc.setPoint(((Node) o).valueOf("./datacite:geoLocationPoint"));
|
||||
res.add(loc);
|
||||
}
|
||||
return res;
|
||||
|
@ -201,17 +193,17 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
|
||||
@Override
|
||||
protected Field<String> prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) {
|
||||
return prepareField(doc, "//dc:date[@dateType='Updated']", info);
|
||||
return prepareField(doc, "//datacite:date[@dateType='Updated']", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<String> prepareDatasetVersion(final Document doc, final DataInfo info) {
|
||||
return prepareField(doc, "//dc:version", info);
|
||||
return prepareField(doc, "//datacite:version", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Field<String> prepareDatasetSize(final Document doc, final DataInfo info) {
|
||||
return prepareField(doc, "//dc:size", info);
|
||||
return prepareField(doc, "//datacite:size", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -221,7 +213,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor {
|
|||
|
||||
@Override
|
||||
protected Field<String> prepareDatasetStorageDate(final Document doc, final DataInfo info) {
|
||||
return prepareField(doc, "//dc:date[@dateType='Issued']", info);
|
||||
return prepareField(doc, "//datacite:date[@dateType='Issued']", info);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,22 +1,14 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
package eu.dnetlib.dhp.migration.step3;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
|
@ -25,70 +17,52 @@ import eu.dnetlib.dhp.schema.oaf.Project;
|
|||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ExtractEntitiesFromHDFSJob {
|
||||
public class DispatchEntitiesApplication {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ExtractEntitiesFromHDFSJob.class);
|
||||
private static final Log log = LogFactory.getLog(DispatchEntitiesApplication.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(MigrateMongoMdstoresApplication.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
|
||||
.appName(DispatchEntitiesApplication.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
||||
|
||||
final List<String> sourcePaths = Arrays.stream(parser.get("sourcePaths").split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
|
||||
final String sourcePath = parser.get("sourcePath");
|
||||
final String targetPath = parser.get("graphRawPath");
|
||||
|
||||
processEntity(sc, Publication.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Dataset.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Software.class, sourcePaths, targetPath);
|
||||
processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Datasource.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Organization.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Project.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Relation.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Publication.class, sourcePath, targetPath);
|
||||
processEntity(sc, Dataset.class, sourcePath, targetPath);
|
||||
processEntity(sc, Software.class, sourcePath, targetPath);
|
||||
processEntity(sc, OtherResearchProduct.class, sourcePath, targetPath);
|
||||
processEntity(sc, Datasource.class, sourcePath, targetPath);
|
||||
processEntity(sc, Organization.class, sourcePath, targetPath);
|
||||
processEntity(sc, Project.class, sourcePath, targetPath);
|
||||
processEntity(sc, Relation.class, sourcePath, targetPath);
|
||||
}
|
||||
}
|
||||
|
||||
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final List<String> sourcePaths, final String targetPath) {
|
||||
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final String sourcePath, final String targetPath) {
|
||||
final String type = clazz.getSimpleName().toLowerCase();
|
||||
|
||||
log.info(String.format("Processing entities (%s) in files:", type));
|
||||
sourcePaths.forEach(log::info);
|
||||
|
||||
JavaRDD<String> inputRdd = sc.emptyRDD();
|
||||
|
||||
for (final String sp : sourcePaths) {
|
||||
inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class)
|
||||
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
|
||||
.filter(k -> isEntityType(k._1(), type))
|
||||
.map(Tuple2::_2));
|
||||
}
|
||||
|
||||
inputRdd.saveAsTextFile(targetPath + "/" + type);
|
||||
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
|
||||
|
||||
sc.textFile(sourcePath)
|
||||
.filter(l -> isEntityType(l, type))
|
||||
.map(l -> StringUtils.substringAfter(l, "|"))
|
||||
.saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ???
|
||||
}
|
||||
|
||||
private static boolean isEntityType(final String item, final String type) {
|
||||
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(type);
|
||||
private static boolean isEntityType(final String line, final String type) {
|
||||
return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type);
|
||||
}
|
||||
|
||||
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
|
||||
try {
|
||||
final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
|
||||
final Path path = new Path(pathToFile);
|
||||
return hdfs.exists(path);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,10 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.utils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -20,19 +14,9 @@ import org.apache.hadoop.io.SequenceFile;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class AbstractMigrationExecutor implements Closeable {
|
||||
public class AbstractMigrationApplication implements Closeable {
|
||||
|
||||
private final AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
|
@ -40,13 +24,13 @@ public class AbstractMigrationExecutor implements Closeable {
|
|||
|
||||
private final Text value = new Text();
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
private final SequenceFile.Writer writer;
|
||||
|
||||
private static final Log log = LogFactory.getLog(AbstractMigrationExecutor.class);
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception {
|
||||
private static final Log log = LogFactory.getLog(AbstractMigrationApplication.class);
|
||||
|
||||
public AbstractMigrationApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception {
|
||||
|
||||
log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser));
|
||||
|
||||
|
@ -65,186 +49,32 @@ public class AbstractMigrationExecutor implements Closeable {
|
|||
return conf;
|
||||
}
|
||||
|
||||
protected void emitOaf(final Oaf oaf) {
|
||||
protected void emit(final String s, final String type) {
|
||||
try {
|
||||
key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase());
|
||||
value.set(objectMapper.writeValueAsString(oaf));
|
||||
key.set(counter.getAndIncrement() + ":" + type);
|
||||
value.set(s);
|
||||
writer.append(key, value);
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void emitOaf(final Oaf oaf) {
|
||||
try {
|
||||
emit(objectMapper.writeValueAsString(oaf), oaf.getClass().getSimpleName().toLowerCase());
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public ObjectMapper getObjectMapper() {
|
||||
return objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
writer.hflush();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
public static KeyValue keyValue(final String k, final String v) {
|
||||
final KeyValue kv = new KeyValue();
|
||||
kv.setKey(k);
|
||||
kv.setValue(v);
|
||||
return kv;
|
||||
}
|
||||
|
||||
public static List<KeyValue> listKeyValues(final String... s) {
|
||||
if (s.length % 2 > 0) { throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); }
|
||||
|
||||
final List<KeyValue> list = new ArrayList<>();
|
||||
for (int i = 0; i < s.length; i += 2) {
|
||||
list.add(keyValue(s[i], s[i + 1]));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
public static <T> Field<T> field(final T value, final DataInfo info) {
|
||||
if (value == null || StringUtils.isBlank(value.toString())) { return null; }
|
||||
|
||||
final Field<T> field = new Field<>();
|
||||
field.setValue(value);
|
||||
field.setDataInfo(info);
|
||||
return field;
|
||||
}
|
||||
|
||||
public static List<Field<String>> listFields(final DataInfo info, final String... values) {
|
||||
return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static List<Field<String>> listFields(final DataInfo info, final List<String> values) {
|
||||
return values.stream().map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) {
|
||||
final Qualifier q = new Qualifier();
|
||||
q.setClassid(classid);
|
||||
q.setClassname(classname);
|
||||
q.setSchemeid(schemeid);
|
||||
q.setSchemename(schemename);
|
||||
return q;
|
||||
}
|
||||
|
||||
public static StructuredProperty structuredProperty(final String value,
|
||||
final String classid,
|
||||
final String classname,
|
||||
final String schemeid,
|
||||
final String schemename,
|
||||
final DataInfo dataInfo) {
|
||||
|
||||
return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
|
||||
}
|
||||
|
||||
public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) {
|
||||
if (value == null) { return null; }
|
||||
final StructuredProperty sp = new StructuredProperty();
|
||||
sp.setValue(value);
|
||||
sp.setQualifier(qualifier);
|
||||
sp.setDataInfo(dataInfo);
|
||||
return sp;
|
||||
}
|
||||
|
||||
public static ExtraInfo extraInfo(final String name, final String value, final String typology, final String provenance, final String trust) {
|
||||
final ExtraInfo info = new ExtraInfo();
|
||||
info.setName(name);
|
||||
info.setValue(value);
|
||||
info.setTypology(typology);
|
||||
info.setProvenance(provenance);
|
||||
info.setTrust(trust);
|
||||
return info;
|
||||
}
|
||||
|
||||
public static OAIProvenance oaiIProvenance(final String identifier,
|
||||
final String baseURL,
|
||||
final String metadataNamespace,
|
||||
final Boolean altered,
|
||||
final String datestamp,
|
||||
final String harvestDate) {
|
||||
|
||||
final OriginDescription desc = new OriginDescription();
|
||||
desc.setIdentifier(identifier);
|
||||
desc.setBaseURL(baseURL);
|
||||
desc.setMetadataNamespace(metadataNamespace);
|
||||
desc.setAltered(altered);
|
||||
desc.setDatestamp(datestamp);
|
||||
desc.setHarvestDate(harvestDate);
|
||||
|
||||
final OAIProvenance p = new OAIProvenance();
|
||||
p.setOriginDescription(desc);
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
public static Journal journal(final String name,
|
||||
final String issnPrinted,
|
||||
final String issnOnline,
|
||||
final String issnLinking,
|
||||
final String ep,
|
||||
final String iss,
|
||||
final String sp,
|
||||
final String vol,
|
||||
final String edition,
|
||||
final String conferenceplace,
|
||||
final String conferencedate,
|
||||
final DataInfo dataInfo) {
|
||||
|
||||
if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) {
|
||||
final Journal j = new Journal();
|
||||
j.setName(name);
|
||||
j.setIssnPrinted(issnPrinted);
|
||||
j.setIssnOnline(issnOnline);
|
||||
j.setIssnLinking(issnLinking);
|
||||
j.setEp(ep);
|
||||
j.setIss(iss);
|
||||
j.setSp(sp);
|
||||
j.setVol(vol);
|
||||
j.setEdition(edition);
|
||||
j.setConferenceplace(conferenceplace);
|
||||
j.setConferencedate(conferencedate);
|
||||
j.setDataInfo(dataInfo);
|
||||
return j;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static DataInfo dataInfo(final Boolean deletedbyinference,
|
||||
final String inferenceprovenance,
|
||||
final Boolean inferred,
|
||||
final Boolean invisible,
|
||||
final Qualifier provenanceaction,
|
||||
final String trust) {
|
||||
final DataInfo d = new DataInfo();
|
||||
d.setDeletedbyinference(deletedbyinference);
|
||||
d.setInferenceprovenance(inferenceprovenance);
|
||||
d.setInferred(inferred);
|
||||
d.setInvisible(invisible);
|
||||
d.setProvenanceaction(provenanceaction);
|
||||
d.setTrust(trust);
|
||||
return d;
|
||||
}
|
||||
|
||||
public static String createOpenaireId(final int prefix, final String originalId) {
|
||||
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
|
||||
final String rest = StringUtils.substringAfter(originalId, "::");
|
||||
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
|
||||
}
|
||||
|
||||
public static String createOpenaireId(final String type, final String originalId) {
|
||||
switch (type) {
|
||||
case "datasource":
|
||||
return createOpenaireId(10, originalId);
|
||||
case "organization":
|
||||
return createOpenaireId(20, originalId);
|
||||
case "person":
|
||||
return createOpenaireId(30, originalId);
|
||||
case "project":
|
||||
return createOpenaireId(40, originalId);
|
||||
default:
|
||||
return createOpenaireId(50, originalId);
|
||||
}
|
||||
}
|
||||
|
||||
public static String asString(final Object o) {
|
||||
return o == null ? "" : o.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.utils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
package eu.dnetlib.dhp.migration.utils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
package eu.dnetlib.dhp.migration.utils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
|
||||
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class OafMapperUtils {
|
||||
|
||||
public static KeyValue keyValue(final String k, final String v) {
|
||||
final KeyValue kv = new KeyValue();
|
||||
kv.setKey(k);
|
||||
kv.setValue(v);
|
||||
return kv;
|
||||
}
|
||||
|
||||
public static List<KeyValue> listKeyValues(final String... s) {
|
||||
if (s.length % 2 > 0) { throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); }
|
||||
|
||||
final List<KeyValue> list = new ArrayList<>();
|
||||
for (int i = 0; i < s.length; i += 2) {
|
||||
list.add(keyValue(s[i], s[i + 1]));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
public static <T> Field<T> field(final T value, final DataInfo info) {
|
||||
if (value == null || StringUtils.isBlank(value.toString())) { return null; }
|
||||
|
||||
final Field<T> field = new Field<>();
|
||||
field.setValue(value);
|
||||
field.setDataInfo(info);
|
||||
return field;
|
||||
}
|
||||
|
||||
public static List<Field<String>> listFields(final DataInfo info, final String... values) {
|
||||
return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static List<Field<String>> listFields(final DataInfo info, final List<String> values) {
|
||||
return values.stream().map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) {
|
||||
final Qualifier q = new Qualifier();
|
||||
q.setClassid(classid);
|
||||
q.setClassname(classname);
|
||||
q.setSchemeid(schemeid);
|
||||
q.setSchemename(schemename);
|
||||
return q;
|
||||
}
|
||||
|
||||
public static StructuredProperty structuredProperty(final String value,
|
||||
final String classid,
|
||||
final String classname,
|
||||
final String schemeid,
|
||||
final String schemename,
|
||||
final DataInfo dataInfo) {
|
||||
|
||||
return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
|
||||
}
|
||||
|
||||
public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) {
|
||||
if (value == null) { return null; }
|
||||
final StructuredProperty sp = new StructuredProperty();
|
||||
sp.setValue(value);
|
||||
sp.setQualifier(qualifier);
|
||||
sp.setDataInfo(dataInfo);
|
||||
return sp;
|
||||
}
|
||||
|
||||
public static ExtraInfo extraInfo(final String name, final String value, final String typology, final String provenance, final String trust) {
|
||||
final ExtraInfo info = new ExtraInfo();
|
||||
info.setName(name);
|
||||
info.setValue(value);
|
||||
info.setTypology(typology);
|
||||
info.setProvenance(provenance);
|
||||
info.setTrust(trust);
|
||||
return info;
|
||||
}
|
||||
|
||||
public static OAIProvenance oaiIProvenance(final String identifier,
|
||||
final String baseURL,
|
||||
final String metadataNamespace,
|
||||
final Boolean altered,
|
||||
final String datestamp,
|
||||
final String harvestDate) {
|
||||
|
||||
final OriginDescription desc = new OriginDescription();
|
||||
desc.setIdentifier(identifier);
|
||||
desc.setBaseURL(baseURL);
|
||||
desc.setMetadataNamespace(metadataNamespace);
|
||||
desc.setAltered(altered);
|
||||
desc.setDatestamp(datestamp);
|
||||
desc.setHarvestDate(harvestDate);
|
||||
|
||||
final OAIProvenance p = new OAIProvenance();
|
||||
p.setOriginDescription(desc);
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
public static Journal journal(final String name,
|
||||
final String issnPrinted,
|
||||
final String issnOnline,
|
||||
final String issnLinking,
|
||||
final String ep,
|
||||
final String iss,
|
||||
final String sp,
|
||||
final String vol,
|
||||
final String edition,
|
||||
final String conferenceplace,
|
||||
final String conferencedate,
|
||||
final DataInfo dataInfo) {
|
||||
|
||||
if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) {
|
||||
final Journal j = new Journal();
|
||||
j.setName(name);
|
||||
j.setIssnPrinted(issnPrinted);
|
||||
j.setIssnOnline(issnOnline);
|
||||
j.setIssnLinking(issnLinking);
|
||||
j.setEp(ep);
|
||||
j.setIss(iss);
|
||||
j.setSp(sp);
|
||||
j.setVol(vol);
|
||||
j.setEdition(edition);
|
||||
j.setConferenceplace(conferenceplace);
|
||||
j.setConferencedate(conferencedate);
|
||||
j.setDataInfo(dataInfo);
|
||||
return j;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static DataInfo dataInfo(final Boolean deletedbyinference,
|
||||
final String inferenceprovenance,
|
||||
final Boolean inferred,
|
||||
final Boolean invisible,
|
||||
final Qualifier provenanceaction,
|
||||
final String trust) {
|
||||
final DataInfo d = new DataInfo();
|
||||
d.setDeletedbyinference(deletedbyinference);
|
||||
d.setInferenceprovenance(inferenceprovenance);
|
||||
d.setInferred(inferred);
|
||||
d.setInvisible(invisible);
|
||||
d.setProvenanceaction(provenanceaction);
|
||||
d.setTrust(trust);
|
||||
return d;
|
||||
}
|
||||
|
||||
public static String createOpenaireId(final int prefix, final String originalId) {
|
||||
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
|
||||
final String rest = StringUtils.substringAfter(originalId, "::");
|
||||
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
|
||||
}
|
||||
|
||||
public static String createOpenaireId(final String type, final String originalId) {
|
||||
switch (type) {
|
||||
case "datasource":
|
||||
return createOpenaireId(10, originalId);
|
||||
case "organization":
|
||||
return createOpenaireId(20, originalId);
|
||||
case "person":
|
||||
return createOpenaireId(30, originalId);
|
||||
case "project":
|
||||
return createOpenaireId(40, originalId);
|
||||
default:
|
||||
return createOpenaireId(50, originalId);
|
||||
}
|
||||
}
|
||||
|
||||
public static String asString(final Object o) {
|
||||
return o == null ? "" : o.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.migration.pace;
|
||||
package eu.dnetlib.dhp.migration.utils;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.text.Normalizer;
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
[
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePaths",
|
||||
"paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the source path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
[
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePaths",
|
||||
"paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the path of the target file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pgurl",
|
||||
"paramLongName": "postgresUrl",
|
||||
"paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pguser",
|
||||
"paramLongName": "postgresUser",
|
||||
"paramDescription": "postgres user",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "pgpasswd",
|
||||
"paramLongName": "postgresPassword",
|
||||
"paramDescription": "postgres password",
|
||||
"paramRequired": false
|
||||
}
|
||||
|
||||
]
|
|
@ -18,19 +18,19 @@
|
|||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dburl",
|
||||
"paramName": "pgurl",
|
||||
"paramLongName": "postgresUrl",
|
||||
"paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dbuser",
|
||||
"paramName": "pguser",
|
||||
"paramLongName": "postgresUser",
|
||||
"paramDescription": "postgres user",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "dbpasswd",
|
||||
"paramName": "pgpasswd",
|
||||
"paramLongName": "postgresPassword",
|
||||
"paramDescription": "postgres password",
|
||||
"paramRequired": false
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "db",
|
||||
"paramName": "mongodb",
|
||||
"paramLongName": "mongoDb",
|
||||
"paramDescription": "mongo database",
|
||||
"paramRequired": true
|
||||
|
@ -46,23 +46,5 @@
|
|||
"paramLongName": "mdInterpretation",
|
||||
"paramDescription": "metadata interpretation",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pgurl",
|
||||
"paramLongName": "postgresUrl",
|
||||
"paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pguser",
|
||||
"paramLongName": "postgresUser",
|
||||
"paramDescription": "postgres user",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "pgpasswd",
|
||||
"paramLongName": "postgresPassword",
|
||||
"paramDescription": "postgres password",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hdfsUser</name>
|
||||
<value>dnet</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,175 @@
|
|||
<workflow-app name="import Claims as Graph" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>migrationClaimsPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationClaimsPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationClaimsPathStep3</name>
|
||||
<description>the graph Raw base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoURL</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoDb</name>
|
||||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${migrationClaimsPathStep1}'/>
|
||||
<mkdir path='${migrationClaimsPathStep1}'/>
|
||||
</fs>
|
||||
<ok to="ImportDBClaims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportDBClaims">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationClaimsPathStep1}/db_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
<arg>-a</arg><arg>claims</arg>
|
||||
</java>
|
||||
<ok to="ImportODFClaims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODFClaims">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationClaimsPathStep1}/odf_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
</java>
|
||||
<ok to="ImportOAFClaims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAFClaims">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationClaimsPathStep1}/oaf_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
</java>
|
||||
<ok to="ResetClaimEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResetClaimEntities">
|
||||
<fs>
|
||||
<delete path='${migrationClaimsPathStep2}'/>
|
||||
<mkdir path='${migrationClaimsPathStep2}'/>
|
||||
</fs>
|
||||
<ok to="GenerateClaimEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateClaimEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateClaimEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims</arg>
|
||||
<arg>-t</arg><arg>${migrationClaimsPathStep2}/claim_entities</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</spark>
|
||||
<ok to="ResetClaimGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResetClaimGraph">
|
||||
<fs>
|
||||
<delete path='${migrationClaimsPathStep3}'/>
|
||||
<mkdir path='${migrationClaimsPathStep3}'/>
|
||||
</fs>
|
||||
<ok to="GenerateClaimGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateClaimGraph">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateClaimGraph</name>
|
||||
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationClaimsPathStep2}/claim_entities</arg>
|
||||
<arg>-g</arg><arg>${migrationClaimsPathStep3}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,14 +1,17 @@
|
|||
<workflow-app name="import Entities from aggretor to HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||
<workflow-app name="import regular entities as Graph (all steps)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<name>migrationPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>graphRawPath</name>
|
||||
<name>migrationPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationPathStep3</name>
|
||||
<description>the graph Raw base path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
|
@ -22,7 +25,7 @@
|
|||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongourl</name>
|
||||
<name>mongoURL</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -51,152 +54,117 @@
|
|||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${workingPath}'/>
|
||||
<mkdir path='${workingPath}'/>
|
||||
<delete path='${migrationPathStep1}'/>
|
||||
<mkdir path='${migrationPathStep1}'/>
|
||||
</fs>
|
||||
<ok to="ImportEntitiesFromPostgres"/>
|
||||
<ok to="ImportDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportEntitiesFromPostgres">
|
||||
<action name="ImportDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/db_entities</arg>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-dburl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-dbuser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportClaimsFromPostgres"/>
|
||||
<ok to="ImportODF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportClaimsFromPostgres">
|
||||
<action name="ImportODF">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/db_claims</arg>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-dburl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-dbuser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
|
||||
<arg>-a</arg><arg>claims</arg>
|
||||
</java>
|
||||
<ok to="ImportODFEntitiesFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODFEntitiesFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/odf_entities</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportOAFEntitiesFromMongoDB"/>
|
||||
<ok to="ImportOAF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAFEntitiesFromMongoDB">
|
||||
<action name="ImportOAF">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/oaf_entities</arg>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportODFClaimsFromMongoDB"/>
|
||||
<ok to="ResetEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODFClaimsFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/odf_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportOAFClaimsFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAFClaimsFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/oaf_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ResetGraphRawPath"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResetGraphRawPath">
|
||||
<action name="ResetEntities">
|
||||
<fs>
|
||||
<delete path='${graphRawPath}'/>
|
||||
<mkdir path='${graphRawPath}'/>
|
||||
<delete path='${migrationPathStep2}'/>
|
||||
<mkdir path='${migrationPathStep2}'/>
|
||||
</fs>
|
||||
<ok to="ExtractEntitiesInGraphRawPath"/>
|
||||
<ok to="GenerateEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ExtractEntitiesInGraphRawPath">
|
||||
<action name="GenerateEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<name>GenerateEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${workingPath}/db_entities,${workingPath}/oaf_entities,${workingPath}/odf_entities</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}</arg>
|
||||
<arg>-s</arg><arg>${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records</arg>
|
||||
<arg>-t</arg><arg>${migrationPathStep2}/all_entities</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</spark>
|
||||
<ok to="ResetGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResetGraph">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep3}'/>
|
||||
<mkdir path='${migrationPathStep3}'/>
|
||||
</fs>
|
||||
<ok to="GenerateGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateGraph">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateGraph</name>
|
||||
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationPathStep2}/all_entities</arg>
|
||||
<arg>-g</arg><arg>${migrationPathStep3}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hdfsUser</name>
|
||||
<value>dnet</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,109 @@
|
|||
<workflow-app name="import regular entities as Graph (step 1)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>migrationPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoURL</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoDb</name>
|
||||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep1}'/>
|
||||
<mkdir path='${migrationPathStep1}'/>
|
||||
</fs>
|
||||
<ok to="ImportDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/db_records</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportODF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODF">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/odf_records</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
</java>
|
||||
<ok to="ImportOAF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAF">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${migrationPathStep1}/oaf_records</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongoURL}</arg>
|
||||
<arg>-mongodb</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hdfsUser</name>
|
||||
<value>dnet</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,74 @@
|
|||
<workflow-app name="import regular entities as Graph (step 2)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>migrationPathStep1</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetEntities"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetEntities">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep2}'/>
|
||||
<mkdir path='${migrationPathStep2}'/>
|
||||
</fs>
|
||||
<ok to="GenerateEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records</arg>
|
||||
<arg>-t</arg><arg>${migrationPathStep2}/all_entities</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hdfsUser</name>
|
||||
<value>dnet</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,60 @@
|
|||
<workflow-app name="import regular entities as Graph (step 3)" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
|
||||
<property>
|
||||
<name>migrationPathStep2</name>
|
||||
<description>the temporary path to store entities before dispatching</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>migrationPathStep3</name>
|
||||
<description>the graph Raw base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetGraph"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetGraph">
|
||||
<fs>
|
||||
<delete path='${migrationPathStep3}'/>
|
||||
<mkdir path='${migrationPathStep3}'/>
|
||||
</fs>
|
||||
<ok to="GenerateGraph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateGraph">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateGraph</name>
|
||||
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>-s</arg><arg>${migrationPathStep2}/all_entities</arg>
|
||||
<arg>-g</arg><arg>${migrationPathStep3}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
Loading…
Reference in New Issue