From 13e36a4da0297cc6186e9e014cdec7c0e59a284d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 10:05:02 +0100 Subject: [PATCH] WIP: added oozie workflow for grouping graph entities by id --- .../dhp/schema/oaf/OafMapperUtils.java | 2 + .../dhp/schema/oaf/ResultTypeComparator.java | 2 + .../groupbyid/DispatchEntitiesSparkJob.java | 87 ++++++ .../GroupEntitiesAndRelationsSparkJob.java | 186 ++++++++++++ ...kJob.java => MergeGraphTableSparkJob.java} | 4 +- .../raw/GenerateEntitiesApplication.java | 31 +- .../oa/graph/raw/common/OafMapperUtils.java | 273 ------------------ .../dispatch_entities_bytype_parameters.json | 20 ++ .../group_graph_entities_parameters.json | 20 ++ .../groupbyid/oozie_app/config-default.xml | 18 ++ .../oa/graph/groupbyid/oozie_app/workflow.xml | 271 +++++++++++++++++ .../dhp/oa/graph/merge/oozie_app/workflow.xml | 16 +- ....java => MergeGraphTableSparkJobTest.java} | 14 +- .../raw/GenerateEntitiesApplicationTest.java | 99 +++++++ 14 files changed, 727 insertions(+), 316 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/{MergeGraphSparkJob.java => MergeGraphTableSparkJob.java} (98%) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml rename dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/{MergeGraphSparkJobTest.java => MergeGraphTableSparkJobTest.java} (90%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java new file mode 100644 index 000000000..6ba918069 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java @@ -0,0 +1,2 @@ +package eu.dnetlib.dhp.schema.oaf;public class OafMapperUtils { +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java new file mode 100644 index 000000000..a2e64b83b --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ResultTypeComparator.java @@ -0,0 +1,2 @@ +package eu.dnetlib.dhp.schema.oaf;public class ResultTypeComparator { +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java new file mode 100644 index 000000000..6b90e9971 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java @@ -0,0 +1,87 @@ + +package eu.dnetlib.dhp.oa.graph.fuse; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class DispatchEntitiesSparkJob { + + private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MigrateMongoMdstoresApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String sourcePath = parser.get("sourcePath"); + final String targetPath = parser.get("graphRawPath"); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, targetPath); + ModelSupport.oafTypes + .values() + .forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath)); + }); + } + + private static void processEntity( + final SparkSession spark, + final Class clazz, + final String sourcePath, + final String targetPath) { + final String type = clazz.getSimpleName().toLowerCase(); + + log.info("Processing entities ({}) in file: {}", type, sourcePath); + + spark + .read() + .textFile(sourcePath) + .filter((FilterFunction) value -> isEntityType(value, type)) + .map( + (MapFunction) l -> StringUtils.substringAfter(l, "|"), + Encoders.STRING()) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(targetPath + "/" + type); + } + + private static boolean isEntityType(final String line, final String type) { + return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java new file mode 100644 index 000000000..7ab8646ec --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java @@ -0,0 +1,186 @@ + +package eu.dnetlib.dhp.oa.graph.groupbyid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +/** + * Groups the graph content by entity identifier to ensure ID uniqueness + */ +public class GroupEntitiesSparkJob { + + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); + + private final static String ID_JPATH = "$.id"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GroupEntitiesSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String graphInputPath = parser.get("graphInputPath"); + log.info("graphInputPath: {}", graphInputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + groupEntities(spark, graphInputPath, outputPath); + }); + } + + private static void groupEntities( + SparkSession spark, + String inputPath, + String outputPath) { + + TypedColumn aggregator = new GroupingAggregator().toColumn(); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + spark + .read() + .textFile(toSeq(listEntityPaths(inputPath, sc))) + .map((MapFunction) s -> parseEntity(s), Encoders.kryo(Oaf.class)) + .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) + .agg(aggregator) + .map((MapFunction, Oaf>) Tuple2::_2, Encoders.kryo(Oaf.class)) + .write() + .mode(SaveMode.Overwrite) + .save(outputPath); + } + + public static class GroupingAggregator extends Aggregator { + + @Override + public Oaf zero() { + return null; + } + + @Override + public Oaf reduce(Oaf b, Oaf a) { + return mergeAndGet(b, a); + } + + private Oaf mergeAndGet(Oaf b, Oaf a) { + if (Objects.nonNull(a) && Objects.nonNull(b)) { + return OafMapperUtils.merge(b, a); + } + return Objects.isNull(a) ? b : a; + } + + @Override + public Oaf merge(Oaf b, Oaf a) { + return mergeAndGet(b, a); + } + + @Override + public Oaf finish(Oaf j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(Oaf.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(Oaf.class); + } + + } + + private static Oaf parseEntity(String s) { + String prefix = StringUtils.substringBefore(jPath(ID_JPATH, s), "|"); + try { + switch (prefix) { + case "10": + return OBJECT_MAPPER.readValue(s, Datasource.class); + case "20": + return OBJECT_MAPPER.readValue(s, Organization.class); + case "40": + return OBJECT_MAPPER.readValue(s, Project.class); + case "50": + String resultType = jPath("$.resulttype.classid", s); + switch (resultType) { + case "publication": + return OBJECT_MAPPER.readValue(s, Publication.class); + case "dataset": + return OBJECT_MAPPER.readValue(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); + case "software": + return OBJECT_MAPPER.readValue(s, Software.class); + case "other": + return OBJECT_MAPPER.readValue(s, OtherResearchProduct.class); + default: + throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); + } + default: + throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static List listEntityPaths(String inputPath, JavaSparkContext sc) { + return HdfsSupport + .listFiles(inputPath, sc.hadoopConfiguration()) + .stream() + .filter(p -> !p.contains("relation")) + .collect(Collectors.toList()); + } + + private static String jPath(final String path, final String json) { + Object o = JsonPath.read(json, path); + if (o instanceof String) + return (String) o; + throw new IllegalStateException(String.format("could not extract '%s' from:\n%s", path, json)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java similarity index 98% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index 037683604..e53f4ca30 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -33,9 +33,9 @@ import scala.Tuple2; * are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined * by eu.dnetlib.dhp.schema.common.ModelSupport#idFn() */ -public class MergeGraphSparkJob { +public class MergeGraphTableSparkJob { - private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 3568dc52a..99aa9c3f0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -4,11 +4,9 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -20,6 +18,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,16 +28,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -113,7 +103,7 @@ public class GenerateEntitiesApplication { inputRdd .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) - .reduceByKey((o1, o2) -> merge(o1, o2)) + .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2)) .map(Tuple2::_2) .map( oaf -> oaf.getClass().getSimpleName().toLowerCase() @@ -122,17 +112,6 @@ public class GenerateEntitiesApplication { .saveAsTextFile(targetPath, GzipCodec.class); } - private static Oaf merge(final Oaf o1, final Oaf o2) { - if (ModelSupport.isSubClass(o1, OafEntity.class)) { - ((OafEntity) o1).mergeFrom((OafEntity) o2); - } else if (ModelSupport.isSubClass(o1, Relation.class)) { - ((Relation) o1).mergeFrom((Relation) o2); - } else { - throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); - } - return o1; - } - private static List convertToListOaf( final String id, final String s, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java deleted file mode 100644 index 84b29e3d4..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java +++ /dev/null @@ -1,273 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.raw.common; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.ExtraInfo; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Journal; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.OAIProvenance; -import eu.dnetlib.dhp.schema.oaf.OriginDescription; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.utils.DHPUtils; - -public class OafMapperUtils { - - public static KeyValue keyValue(final String k, final String v) { - final KeyValue kv = new KeyValue(); - kv.setKey(k); - kv.setValue(v); - return kv; - } - - public static List listKeyValues(final String... s) { - if (s.length % 2 > 0) { - throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); - } - - final List list = new ArrayList<>(); - for (int i = 0; i < s.length; i += 2) { - list.add(keyValue(s[i], s[i + 1])); - } - return list; - } - - public static Field field(final T value, final DataInfo info) { - if (value == null || StringUtils.isBlank(value.toString())) { - return null; - } - - final Field field = new Field<>(); - field.setValue(value); - field.setDataInfo(info); - return field; - } - - public static List> listFields(final DataInfo info, final String... values) { - return Arrays - .stream(values) - .map(v -> field(v, info)) - .filter(Objects::nonNull) - .filter(distinctByKey(f -> f.getValue())) - .collect(Collectors.toList()); - } - - public static List> listFields(final DataInfo info, final List values) { - return values - .stream() - .map(v -> field(v, info)) - .filter(Objects::nonNull) - .filter(distinctByKey(f -> f.getValue())) - .collect(Collectors.toList()); - } - - public static Qualifier unknown(final String schemeid, final String schemename) { - return qualifier("UNKNOWN", "Unknown", schemeid, schemename); - } - - public static Qualifier qualifier( - final String classid, - final String classname, - final String schemeid, - final String schemename) { - final Qualifier q = new Qualifier(); - q.setClassid(classid); - q.setClassname(classname); - q.setSchemeid(schemeid); - q.setSchemename(schemename); - return q; - } - - public static StructuredProperty structuredProperty( - final String value, - final String classid, - final String classname, - final String schemeid, - final String schemename, - final DataInfo dataInfo) { - - return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); - } - - public static StructuredProperty structuredProperty( - final String value, - final Qualifier qualifier, - final DataInfo dataInfo) { - if (value == null) { - return null; - } - final StructuredProperty sp = new StructuredProperty(); - sp.setValue(value); - sp.setQualifier(qualifier); - sp.setDataInfo(dataInfo); - return sp; - } - - public static ExtraInfo extraInfo( - final String name, - final String value, - final String typology, - final String provenance, - final String trust) { - final ExtraInfo info = new ExtraInfo(); - info.setName(name); - info.setValue(value); - info.setTypology(typology); - info.setProvenance(provenance); - info.setTrust(trust); - return info; - } - - public static OAIProvenance oaiIProvenance( - final String identifier, - final String baseURL, - final String metadataNamespace, - final Boolean altered, - final String datestamp, - final String harvestDate) { - - final OriginDescription desc = new OriginDescription(); - desc.setIdentifier(identifier); - desc.setBaseURL(baseURL); - desc.setMetadataNamespace(metadataNamespace); - desc.setAltered(altered); - desc.setDatestamp(datestamp); - desc.setHarvestDate(harvestDate); - - final OAIProvenance p = new OAIProvenance(); - p.setOriginDescription(desc); - - return p; - } - - public static Journal journal( - final String name, - final String issnPrinted, - final String issnOnline, - final String issnLinking, - final DataInfo dataInfo) { - return journal( - name, - issnPrinted, - issnOnline, - issnLinking, - null, - null, - null, - null, - null, - null, - null, - dataInfo); - } - - public static Journal journal( - final String name, - final String issnPrinted, - final String issnOnline, - final String issnLinking, - final String ep, - final String iss, - final String sp, - final String vol, - final String edition, - final String conferenceplace, - final String conferencedate, - final DataInfo dataInfo) { - - if (StringUtils.isNotBlank(name) - || StringUtils.isNotBlank(issnPrinted) - || StringUtils.isNotBlank(issnOnline) - || StringUtils.isNotBlank(issnLinking)) { - final Journal j = new Journal(); - j.setName(name); - j.setIssnPrinted(issnPrinted); - j.setIssnOnline(issnOnline); - j.setIssnLinking(issnLinking); - j.setEp(ep); - j.setIss(iss); - j.setSp(sp); - j.setVol(vol); - j.setEdition(edition); - j.setConferenceplace(conferenceplace); - j.setConferencedate(conferencedate); - j.setDataInfo(dataInfo); - return j; - } else { - return null; - } - } - - public static DataInfo dataInfo( - final Boolean deletedbyinference, - final String inferenceprovenance, - final Boolean inferred, - final Boolean invisible, - final Qualifier provenanceaction, - final String trust) { - final DataInfo d = new DataInfo(); - d.setDeletedbyinference(deletedbyinference); - d.setInferenceprovenance(inferenceprovenance); - d.setInferred(inferred); - d.setInvisible(invisible); - d.setProvenanceaction(provenanceaction); - d.setTrust(trust); - return d; - } - - public static String createOpenaireId( - final int prefix, - final String originalId, - final boolean to_md5) { - if (StringUtils.isBlank(originalId)) { - return null; - } else if (to_md5) { - final String nsPrefix = StringUtils.substringBefore(originalId, "::"); - final String rest = StringUtils.substringAfter(originalId, "::"); - return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); - } else { - return String.format("%s|%s", prefix, originalId); - } - } - - public static String createOpenaireId( - final String type, - final String originalId, - final boolean to_md5) { - switch (type) { - case "datasource": - return createOpenaireId(10, originalId, to_md5); - case "organization": - return createOpenaireId(20, originalId, to_md5); - case "person": - return createOpenaireId(30, originalId, to_md5); - case "project": - return createOpenaireId(40, originalId, to_md5); - default: - return createOpenaireId(50, originalId, to_md5); - } - } - - public static String asString(final Object o) { - return o == null ? "" : o.toString(); - } - - public static Predicate distinctByKey( - final Function keyExtractor) { - final Map seen = new ConcurrentHashMap<>(); - return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json new file mode 100644 index 000000000..7d995f39a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "g", + "paramLongName": "graphRawPath", + "paramDescription": "the path of the graph Raw in hdfs", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json new file mode 100644 index 000000000..e65acb3c4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "gin", + "paramLongName": "graphInputPath", + "paramDescription": "the graph root path", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the output merged graph root path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml new file mode 100644 index 000000000..3715d097d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml @@ -0,0 +1,271 @@ + + + + + graphInputPath + the graph root input path + + + outputPath + the graph root output path + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Fuse graph entities by ID + eu.dnetlib.dhp.oa.graph.fuse.FuseGraphResultsSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphInputPath${graphInputPath} + --outputPath${workingDir}/entities + + + + + + + + yarn + cluster + Merge datasets + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/dataset + --prodInputPath${prodInputGgraphPath}/dataset + --outputPath${graphOutputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --priority${priority} + + + + + + + + yarn + cluster + Merge otherresearchproducts + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/otherresearchproduct + --prodInputPath${prodInputGgraphPath}/otherresearchproduct + --outputPath${graphOutputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --priority${priority} + + + + + + + + yarn + cluster + Merge softwares + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/software + --prodInputPath${prodInputGgraphPath}/software + --outputPath${graphOutputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --priority${priority} + + + + + + + + yarn + cluster + Merge datasources + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/datasource + --prodInputPath${prodInputGgraphPath}/datasource + --outputPath${graphOutputPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --priority${priority} + + + + + + + + yarn + cluster + Merge organizations + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/organization + --prodInputPath${prodInputGgraphPath}/organization + --outputPath${graphOutputPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --priority${priority} + + + + + + + + yarn + cluster + Merge projects + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/project + --prodInputPath${prodInputGgraphPath}/project + --outputPath${graphOutputPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --priority${priority} + + + + + + + + yarn + cluster + Merge relations + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --betaInputPath${betaInputGgraphPath}/relation + --prodInputPath${prodInputGgraphPath}/relation + --outputPath${graphOutputPath}/relation + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation + --priority${priority} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 07a125fb6..604f515a5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -76,7 +76,7 @@ yarn cluster Merge publications - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -103,7 +103,7 @@ yarn cluster Merge datasets - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -130,7 +130,7 @@ yarn cluster Merge otherresearchproducts - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -157,7 +157,7 @@ yarn cluster Merge softwares - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -184,7 +184,7 @@ yarn cluster Merge datasources - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -211,7 +211,7 @@ yarn cluster Merge organizations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -238,7 +238,7 @@ yarn cluster Merge projects - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -265,7 +265,7 @@ yarn cluster Merge relations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java similarity index 90% rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java index 28e8e5abc..0089811cf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java @@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Datasource; -public class MergeGraphSparkJobTest { +public class MergeGraphTableSparkJobTest { private ObjectMapper mapper; @@ -28,7 +28,7 @@ public class MergeGraphSparkJobTest { public void testMergeDatasources() throws IOException { assertEquals( "openaire-cris_1.1", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_cris.json"), d("datasource_UNKNOWN.json")) @@ -36,7 +36,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "openaire-cris_1.1", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_UNKNOWN.json"), d("datasource_cris.json")) @@ -44,7 +44,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "driver-openaire2.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_native.json"), d("datasource_driver-openaire2.0.json")) @@ -52,7 +52,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "driver-openaire2.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_driver-openaire2.0.json"), d("datasource_native.json")) @@ -60,7 +60,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "openaire4.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_notCompatible.json"), d("datasource_openaire4.0.json")) @@ -68,7 +68,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "notCompatible", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_notCompatible.json"), d("datasource_UNKNOWN.json")) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java new file mode 100644 index 000000000..705f1dddb --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -0,0 +1,99 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; +import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +public class GenerateEntitiesApplicationTest { + + @Mock + private ISLookUpService isLookUpService; + + @Mock + private VocabularyGroup vocs; + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); + lenient() + .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) + .thenReturn(synonyms()); + + vocs = VocabularyGroup.loadVocsFromIS(isLookUpService); + } + + @Test + public void testMergeResult() throws IOException { + Result publication = getResult("oaf_record.xml", Publication.class); + Result dataset = getResult("odf_dataset.xml", Dataset.class); + Result software = getResult("odf_software.xml", Software.class); + Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class); + + verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + + verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + + verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID); + + verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + + verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID); + + verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID); + verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID); + } + + protected void verifyMerge(Result publication, Result dataset, Class clazz, + String resultType) { + final Result merge = OafMapperUtils.mergeResults(publication, dataset); + assertTrue(clazz.isAssignableFrom(merge.getClass())); + assertEquals(resultType, merge.getResulttype().getClassid()); + } + + protected Result getResult(String xmlFileName, Class clazz) throws IOException { + final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName)); + return new OdfToOafMapper(vocs, false) + .processMdRecord(xml) + .stream() + .filter(s -> clazz.isAssignableFrom(s.getClass())) + .map(s -> (Result) s) + .findFirst() + .get(); + } + + private List vocs() throws IOException { + return IOUtils + .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); + } + + private List synonyms() throws IOException { + return IOUtils + .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); + } + +}