From 2bed29eb09616e01ee1d26c082847d7298db9e98 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 10:05:12 +0100 Subject: [PATCH] WIP: added oozie workflow for grouping graph entities by id --- .../dhp/schema/oaf/OafMapperUtils.java | 297 +++++++++++++++++- .../dhp/schema/oaf/ResultTypeComparator.java | 49 ++- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 7 + .../oa/graph/clean/CleanGraphSparkJob.java | 10 +- .../dhp/oa/graph/clean/CleaningFunctions.java | 1 - .../groupbyid/DispatchEntitiesSparkJob.java | 64 ++-- .../GroupEntitiesAndRelationsSparkJob.java | 80 +++-- .../raw/AbstractMdRecordToOafMapper.java | 2 +- .../raw/GenerateEntitiesApplication.java | 24 +- .../raw/MigrateDbEntitiesApplication.java | 10 +- .../dhp/oa/graph/raw/OafToOafMapper.java | 6 +- .../dhp/oa/graph/raw/OdfToOafMapper.java | 6 +- .../dhp/oa/graph/raw/common/Vocabulary.java | 1 + .../oa/graph/raw/common/VocabularyGroup.java | 1 + .../dispatch_entities_bytype_parameters.json | 16 +- .../oa/graph/groupbyid/oozie_app/workflow.xml | 188 ++++++----- .../dhp/oa/graph/merge/oozie_app/workflow.xml | 4 +- .../raw/MigrateDbEntitiesApplicationTest.java | 8 +- .../dhp/oa/provision/XmlConverterJob.java | 15 +- 19 files changed, 578 insertions(+), 211 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java index 6ba918069..4a66f91dc 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java @@ -1,2 +1,297 @@ -package eu.dnetlib.dhp.schema.oaf;public class OafMapperUtils { + +package eu.dnetlib.dhp.schema.oaf; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class OafMapperUtils { + + public static Oaf merge(final Oaf o1, final Oaf o2) { + if (ModelSupport.isSubClass(o1, OafEntity.class)) { + if (ModelSupport.isSubClass(o1, Result.class)) { + + return mergeResults((Result) o1, (Result) o2); + } else if (ModelSupport.isSubClass(o1, Datasource.class)) { + ((Datasource) o1).mergeFrom((Datasource) o2); + } else if (ModelSupport.isSubClass(o1, Organization.class)) { + ((Organization) o1).mergeFrom((Organization) o2); + } else if (ModelSupport.isSubClass(o1, Project.class)) { + ((Project) o1).mergeFrom((Project) o2); + } else { + throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName()); + } + } else if (ModelSupport.isSubClass(o1, Relation.class)) { + ((Relation) o1).mergeFrom((Relation) o2); + } else { + throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); + } + return o1; + } + + public static Result mergeResults(Result r1, Result r2) { + if (new ResultTypeComparator().compare(r1, r2) < 0) { + r1.mergeFrom(r2); + return r1; + } else { + r2.mergeFrom(r1); + return r2; + } + } + + public static KeyValue keyValue(final String k, final String v) { + final KeyValue kv = new KeyValue(); + kv.setKey(k); + kv.setValue(v); + return kv; + } + + public static List listKeyValues(final String... s) { + if (s.length % 2 > 0) { + throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); + } + + final List list = new ArrayList<>(); + for (int i = 0; i < s.length; i += 2) { + list.add(keyValue(s[i], s[i + 1])); + } + return list; + } + + public static Field field(final T value, final DataInfo info) { + if (value == null || StringUtils.isBlank(value.toString())) { + return null; + } + + final Field field = new Field<>(); + field.setValue(value); + field.setDataInfo(info); + return field; + } + + public static List> listFields(final DataInfo info, final String... values) { + return Arrays + .stream(values) + .map(v -> field(v, info)) + .filter(Objects::nonNull) + .filter(distinctByKey(f -> f.getValue())) + .collect(Collectors.toList()); + } + + public static List> listFields(final DataInfo info, final List values) { + return values + .stream() + .map(v -> field(v, info)) + .filter(Objects::nonNull) + .filter(distinctByKey(f -> f.getValue())) + .collect(Collectors.toList()); + } + + public static Qualifier unknown(final String schemeid, final String schemename) { + return qualifier("UNKNOWN", "Unknown", schemeid, schemename); + } + + public static Qualifier qualifier( + final String classid, + final String classname, + final String schemeid, + final String schemename) { + final Qualifier q = new Qualifier(); + q.setClassid(classid); + q.setClassname(classname); + q.setSchemeid(schemeid); + q.setSchemename(schemename); + return q; + } + + public static StructuredProperty structuredProperty( + final String value, + final String classid, + final String classname, + final String schemeid, + final String schemename, + final DataInfo dataInfo) { + + return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); + } + + public static StructuredProperty structuredProperty( + final String value, + final Qualifier qualifier, + final DataInfo dataInfo) { + if (value == null) { + return null; + } + final StructuredProperty sp = new StructuredProperty(); + sp.setValue(value); + sp.setQualifier(qualifier); + sp.setDataInfo(dataInfo); + return sp; + } + + public static ExtraInfo extraInfo( + final String name, + final String value, + final String typology, + final String provenance, + final String trust) { + final ExtraInfo info = new ExtraInfo(); + info.setName(name); + info.setValue(value); + info.setTypology(typology); + info.setProvenance(provenance); + info.setTrust(trust); + return info; + } + + public static OAIProvenance oaiIProvenance( + final String identifier, + final String baseURL, + final String metadataNamespace, + final Boolean altered, + final String datestamp, + final String harvestDate) { + + final OriginDescription desc = new OriginDescription(); + desc.setIdentifier(identifier); + desc.setBaseURL(baseURL); + desc.setMetadataNamespace(metadataNamespace); + desc.setAltered(altered); + desc.setDatestamp(datestamp); + desc.setHarvestDate(harvestDate); + + final OAIProvenance p = new OAIProvenance(); + p.setOriginDescription(desc); + + return p; + } + + public static Journal journal( + final String name, + final String issnPrinted, + final String issnOnline, + final String issnLinking, + final DataInfo dataInfo) { + return journal( + name, + issnPrinted, + issnOnline, + issnLinking, + null, + null, + null, + null, + null, + null, + null, + dataInfo); + } + + public static Journal journal( + final String name, + final String issnPrinted, + final String issnOnline, + final String issnLinking, + final String ep, + final String iss, + final String sp, + final String vol, + final String edition, + final String conferenceplace, + final String conferencedate, + final DataInfo dataInfo) { + + if (StringUtils.isNotBlank(name) + || StringUtils.isNotBlank(issnPrinted) + || StringUtils.isNotBlank(issnOnline) + || StringUtils.isNotBlank(issnLinking)) { + final Journal j = new Journal(); + j.setName(name); + j.setIssnPrinted(issnPrinted); + j.setIssnOnline(issnOnline); + j.setIssnLinking(issnLinking); + j.setEp(ep); + j.setIss(iss); + j.setSp(sp); + j.setVol(vol); + j.setEdition(edition); + j.setConferenceplace(conferenceplace); + j.setConferencedate(conferencedate); + j.setDataInfo(dataInfo); + return j; + } else { + return null; + } + } + + public static DataInfo dataInfo( + final Boolean deletedbyinference, + final String inferenceprovenance, + final Boolean inferred, + final Boolean invisible, + final Qualifier provenanceaction, + final String trust) { + final DataInfo d = new DataInfo(); + d.setDeletedbyinference(deletedbyinference); + d.setInferenceprovenance(inferenceprovenance); + d.setInferred(inferred); + d.setInvisible(invisible); + d.setProvenanceaction(provenanceaction); + d.setTrust(trust); + return d; + } + + public static String createOpenaireId( + final int prefix, + final String originalId, + final boolean to_md5) { + if (StringUtils.isBlank(originalId)) { + return null; + } else if (to_md5) { + final String nsPrefix = StringUtils.substringBefore(originalId, "::"); + final String rest = StringUtils.substringAfter(originalId, "::"); + return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); + } else { + return String.format("%s|%s", prefix, originalId); + } + } + + public static String createOpenaireId( + final String type, + final String originalId, + final boolean to_md5) { + switch (type) { + case "datasource": + return createOpenaireId(10, originalId, to_md5); + case "organization": + return createOpenaireId(20, originalId, to_md5); + case "person": + return createOpenaireId(30, originalId, to_md5); + case "project": + return createOpenaireId(40, originalId, to_md5); + default: + return createOpenaireId(50, originalId, to_md5); + } + } + + public static String asString(final Object o) { + return o == null ? "" : o.toString(); + } + + public static Predicate distinctByKey( + final Function keyExtractor) { + final Map seen = new ConcurrentHashMap<>(); + return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java index a2e64b83b..6c11d1a85 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java @@ -1,2 +1,49 @@ -package eu.dnetlib.dhp.schema.oaf;public class ResultTypeComparator { + +package eu.dnetlib.dhp.schema.oaf; + +import java.util.Comparator; + +import eu.dnetlib.dhp.schema.common.ModelConstants; + +public class ResultTypeComparator implements Comparator { + + @Override + public int compare(Result left, Result right) { + + if (left == null && right == null) + return 0; + if (left == null) + return 1; + if (right == null) + return -1; + + String lClass = left.getResulttype().getClassid(); + String rClass = right.getResulttype().getClassid(); + + if (lClass.equals(rClass)) + return 0; + + if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID)) + return 1; + + if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID)) + return 1; + + if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID)) + return 1; + + if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) + return -1; + if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID)) + return 1; + + // Else (but unlikely), lexicographical ordering will do. + return lClass.compareTo(rClass); + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index dfbaf3a6c..8872174a5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; +import java.util.List; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -15,9 +16,15 @@ import org.apache.commons.codec.binary.Hex; import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; +import scala.collection.JavaConverters; +import scala.collection.Seq; public class DHPUtils { + public static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + public static String md5(final String s) { try { final MessageDigest md = MessageDigest.getInstance("MD5"); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index e295b9503..714b35dac 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -3,13 +3,9 @@ package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.BufferedInputStream; -import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -23,11 +19,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 56a4aaf5a..9f06ea056 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -11,7 +11,6 @@ import org.apache.commons.lang3.StringUtils; import com.clearspring.analytics.util.Lists; import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java index 6b90e9971..1b4226411 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java @@ -1,11 +1,10 @@ -package eu.dnetlib.dhp.oa.graph.fuse; +package eu.dnetlib.dhp.oa.graph.groupbyid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -17,14 +16,22 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import scala.Tuple2; public class DispatchEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -40,48 +47,51 @@ public class DispatchEntitiesSparkJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String sourcePath = parser.get("sourcePath"); - final String targetPath = parser.get("graphRawPath"); + final String entitiesPath = parser.get("entitiesPath"); + log.info("entitiesPath: {}", entitiesPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + runWithSparkSession( conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, targetPath); - ModelSupport.oafTypes - .values() - .forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath)); + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + dispatchOaf(spark, entityClazz, entitiesPath, outputPath); }); } - private static void processEntity( + private static void dispatchOaf( final SparkSession spark, final Class clazz, final String sourcePath, final String targetPath) { - final String type = clazz.getSimpleName().toLowerCase(); - log.info("Processing entities ({}) in file: {}", type, sourcePath); + log.info("Processing entities ({}) in file: {}", clazz.getName(), sourcePath); spark .read() .textFile(sourcePath) - .filter((FilterFunction) value -> isEntityType(value, type)) - .map( - (MapFunction) l -> StringUtils.substringAfter(l, "|"), - Encoders.STRING()) + .filter((FilterFunction) s -> isEntityType(s, clazz)) + .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) - .text(targetPath + "/" + type); + .text(targetPath); } - private static boolean isEntityType(final String line, final String type) { - return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); + private static boolean isEntityType(final String s, final Class clazz) { + return StringUtils.substringBefore(s, "|").equals(clazz.getName()); } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java index 7ab8646ec..1d887cecb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java @@ -12,9 +12,9 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Aggregator; @@ -22,7 +22,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -33,19 +36,21 @@ import scala.Tuple2; /** * Groups the graph content by entity identifier to ensure ID uniqueness */ -public class GroupEntitiesSparkJob { +public class GroupEntitiesAndRelationsSparkJob { - private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class); private final static String ID_JPATH = "$.id"; + private final static String SOURCE_JPATH = "$.source"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - GroupEntitiesSparkJob.class + GroupEntitiesAndRelationsSparkJob.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); @@ -72,11 +77,11 @@ public class GroupEntitiesSparkJob { isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - groupEntities(spark, graphInputPath, outputPath); + groupEntitiesAndRelations(spark, graphInputPath, outputPath); }); } - private static void groupEntities( + private static void groupEntitiesAndRelations( SparkSession spark, String inputPath, String outputPath) { @@ -85,14 +90,19 @@ public class GroupEntitiesSparkJob { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); spark .read() - .textFile(toSeq(listEntityPaths(inputPath, sc))) - .map((MapFunction) s -> parseEntity(s), Encoders.kryo(Oaf.class)) + .textFile(toSeq(listPaths(inputPath, sc))) + .map((MapFunction) s -> parseOaf(s), Encoders.kryo(Oaf.class)) + .filter((FilterFunction) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf))) .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) .agg(aggregator) - .map((MapFunction, Oaf>) Tuple2::_2, Encoders.kryo(Oaf.class)) + .map( + (MapFunction, String>) t -> t._2().getClass().getName() + + "|" + OBJECT_MAPPER.writeValueAsString(t._2()), + Encoders.STRING()) .write() + .option("compression", "gzip") .mode(SaveMode.Overwrite) - .save(outputPath); + .text(outputPath); } public static class GroupingAggregator extends Aggregator { @@ -136,51 +146,61 @@ public class GroupEntitiesSparkJob { } - private static Oaf parseEntity(String s) { - String prefix = StringUtils.substringBefore(jPath(ID_JPATH, s), "|"); - try { + private static Oaf parseOaf(String s) { + + DocumentContext dc = JsonPath + .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); + final String id = dc.read(ID_JPATH); + if (StringUtils.isNotBlank(id)) { + + String prefix = StringUtils.substringBefore(id, "|"); switch (prefix) { case "10": - return OBJECT_MAPPER.readValue(s, Datasource.class); + return parse(s, Datasource.class); case "20": - return OBJECT_MAPPER.readValue(s, Organization.class); + return parse(s, Organization.class); case "40": - return OBJECT_MAPPER.readValue(s, Project.class); + return parse(s, Project.class); case "50": - String resultType = jPath("$.resulttype.classid", s); + String resultType = dc.read("$.resulttype.classid"); switch (resultType) { case "publication": - return OBJECT_MAPPER.readValue(s, Publication.class); + return parse(s, Publication.class); case "dataset": - return OBJECT_MAPPER.readValue(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); + return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); case "software": - return OBJECT_MAPPER.readValue(s, Software.class); + return parse(s, Software.class); case "other": - return OBJECT_MAPPER.readValue(s, OtherResearchProduct.class); + return parse(s, OtherResearchProduct.class); default: throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); } default: throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); } + } else { + String source = dc.read(SOURCE_JPATH); + if (StringUtils.isNotBlank(source)) { + return parse(s, Relation.class); + } else { + throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); + } + } + } + + private static Oaf parse(String s, Class clazz) { + try { + return OBJECT_MAPPER.readValue(s, clazz); } catch (IOException e) { throw new RuntimeException(e); } } - private static List listEntityPaths(String inputPath, JavaSparkContext sc) { + private static List listPaths(String inputPath, JavaSparkContext sc) { return HdfsSupport .listFiles(inputPath, sc.hadoopConfiguration()) .stream() - .filter(p -> !p.contains("relation")) .collect(Collectors.toList()); } - private static String jPath(final String path, final String json) { - Object o = JsonPath.read(json, path); - if (o instanceof String) - return (String) o; - throw new IllegalStateException(String.format("could not extract '%s' from:\n%s", path, json)); - } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 5b6ae72f1..da4b5e324 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; import java.util.*; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 99aa9c3f0..cfd190670 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -4,9 +4,11 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -18,7 +20,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public class GenerateEntitiesApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, targetPath); + HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); generateEntities(spark, vocs, sourcePaths, targetPath); }); } @@ -82,7 +83,7 @@ public class GenerateEntitiesApplication { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final List existingSourcePaths = Arrays .stream(sourcePaths.split(",")) - .filter(p -> exists(sc, p)) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .collect(Collectors.toList()); log.info("Generate entities from files:"); @@ -160,17 +161,4 @@ public class GenerateEntitiesApplication { } } - private static boolean exists(final JavaSparkContext context, final String pathToFile) { - try { - final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration()); - final Path path = new Path(pathToFile); - return hdfs.exists(path); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - private static void removeOutputDir(final SparkSession spark, final String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 6365a1db9..f6f0e0a36 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -1,15 +1,6 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; @@ -32,6 +23,7 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT; import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT; import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; import java.io.Closeable; import java.io.IOException; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java index dea80fabd..e62bc0790 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; import java.util.ArrayList; import java.util.List; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index 6fe7bb971..7124684d5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field; -import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; import java.util.ArrayList; import java.util.Arrays; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java index 9bf198c8b..bfc4fd6f1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java @@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.Qualifier; public class Vocabulary implements Serializable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java index 334339d3b..32452bdc5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java @@ -7,6 +7,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json index 7d995f39a..a75ff3653 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json @@ -6,15 +6,21 @@ "paramRequired": false }, { - "paramName": "s", - "paramLongName": "sourcePath", - "paramDescription": "the source path", + "paramName": "ep", + "paramLongName": "entitiesPath", + "paramDescription": "the entities path", "paramRequired": true }, { "paramName": "g", - "paramLongName": "graphRawPath", - "paramDescription": "the path of the graph Raw in hdfs", + "paramLongName": "outputPath", + "paramDescription": "the output path to store the dispatched entities", + "paramRequired": true + }, + { + "paramName": "class", + "paramLongName": "graphTableClassName", + "paramDescription": "class name modelling the graph table", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml index 3715d097d..70b2ea68e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + @@ -46,18 +46,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - Fuse graph entities by ID - eu.dnetlib.dhp.oa.graph.fuse.FuseGraphResultsSparkJob + group graph entities and relations + eu.dnetlib.dhp.oa.graph.groupbyid.GroupEntitiesAndRelationsSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -70,18 +70,29 @@ --conf spark.sql.shuffle.partitions=7680 --graphInputPath${graphInputPath} - --outputPath${workingDir}/entities + --outputPath${workingDir}/grouped_entities - + - + + + + + + + + + + + + yarn cluster - Merge datasets - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch publications + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -93,22 +104,45 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/dataset - --prodInputPath${prodInputGgraphPath}/dataset - --outputPath${graphOutputPath}/dataset + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + + + + + + + + yarn + cluster + Dispatch datasets + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --priority${priority} - + - + yarn cluster - Merge otherresearchproducts - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch softwares + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -120,49 +154,20 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/otherresearchproduct - --prodInputPath${prodInputGgraphPath}/otherresearchproduct - --outputPath${graphOutputPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --priority${priority} - - - - - - - - yarn - cluster - Merge softwares - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --betaInputPath${betaInputGgraphPath}/software - --prodInputPath${prodInputGgraphPath}/software - --outputPath${graphOutputPath}/software + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --priority${priority} - + - + yarn cluster - Merge datasources - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch otherresearchproducts + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -174,22 +179,45 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/datasource - --prodInputPath${prodInputGgraphPath}/datasource - --outputPath${graphOutputPath}/datasource + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + + + + + + + + yarn + cluster + Dispatch datasources + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --priority${priority} - + - + yarn cluster - Merge organizations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch organizations + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -201,22 +229,20 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/organization - --prodInputPath${prodInputGgraphPath}/organization - --outputPath${graphOutputPath}/organization + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --priority${priority} - + - + yarn cluster - Merge projects - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch project + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -228,22 +254,20 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/project - --prodInputPath${prodInputGgraphPath}/project - --outputPath${graphOutputPath}/project + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --priority${priority} - + - + yarn cluster - Merge relations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + Dispatch relations + eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -255,17 +279,15 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/relation - --prodInputPath${prodInputGgraphPath}/relation - --outputPath${graphOutputPath}/relation + --entitiesPath${workingDir}/grouped_entities + --outputPath${outputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation - --priority${priority} - + - + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 604f515a5..5af117d68 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -2,11 +2,11 @@ - betaInputGgraphPath + betaInputGraphPath the beta graph root path - prodInputGgraphPath + prodInputGraphPath the production graph root path diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index f663d6095..9cf75f208 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -27,14 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.*; @ExtendWith(MockitoExtension.class) public class MigrateDbEntitiesApplicationTest { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index d8eba31b6..b44ed7446 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -2,12 +2,11 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; @@ -28,13 +27,11 @@ import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; -import scala.collection.JavaConverters; -import scala.collection.Seq; /** * XmlConverterJob converts the JoinedEntities as XML records @@ -43,8 +40,6 @@ public class XmlConverterJob { private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static void main(String[] args) throws Exception { @@ -129,10 +124,6 @@ public class XmlConverterJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static Seq toSeq(List list) { - return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); - } - private static Map prepareAccumulators(SparkContext sc) { Map accumulators = Maps.newHashMap(); accumulators