WIP: added oozie workflow for grouping graph entities by id

This commit is contained in:
Claudio Atzori 2020-11-13 10:05:02 +01:00
parent 75324ae58a
commit 13e36a4da0
14 changed files with 727 additions and 316 deletions

View File

@ -0,0 +1,2 @@
package eu.dnetlib.dhp.schema.oaf;public class OafMapperUtils {
}

View File

@ -0,0 +1,2 @@
package eu.dnetlib.dhp.schema.oaf;public class ResultTypeComparator {
}

View File

@ -0,0 +1,87 @@
package eu.dnetlib.dhp.oa.graph.fuse;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateMongoMdstoresApplication.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String sourcePath = parser.get("sourcePath");
final String targetPath = parser.get("graphRawPath");
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, targetPath);
ModelSupport.oafTypes
.values()
.forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath));
});
}
private static <T extends Oaf> void processEntity(
final SparkSession spark,
final Class<T> clazz,
final String sourcePath,
final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase();
log.info("Processing entities ({}) in file: {}", type, sourcePath);
spark
.read()
.textFile(sourcePath)
.filter((FilterFunction<String>) value -> isEntityType(value, type))
.map(
(MapFunction<String, String>) l -> StringUtils.substringAfter(l, "|"),
Encoders.STRING())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(targetPath + "/" + type);
}
private static boolean isEntityType(final String line, final String type) {
return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,186 @@
package eu.dnetlib.dhp.oa.graph.groupbyid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
/**
* Groups the graph content by entity identifier to ensure ID uniqueness
*/
public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private final static String ID_JPATH = "$.id";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GroupEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath);
});
}
private static void groupEntities(
SparkSession spark,
String inputPath,
String outputPath) {
TypedColumn<Oaf, Oaf> aggregator = new GroupingAggregator().toColumn();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark
.read()
.textFile(toSeq(listEntityPaths(inputPath, sc)))
.map((MapFunction<String, Oaf>) s -> parseEntity(s), Encoders.kryo(Oaf.class))
.groupByKey((MapFunction<Oaf, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
.agg(aggregator)
.map((MapFunction<Tuple2<String, Oaf>, Oaf>) Tuple2::_2, Encoders.kryo(Oaf.class))
.write()
.mode(SaveMode.Overwrite)
.save(outputPath);
}
public static class GroupingAggregator extends Aggregator<Oaf, Oaf, Oaf> {
@Override
public Oaf zero() {
return null;
}
@Override
public Oaf reduce(Oaf b, Oaf a) {
return mergeAndGet(b, a);
}
private Oaf mergeAndGet(Oaf b, Oaf a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.merge(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public Oaf merge(Oaf b, Oaf a) {
return mergeAndGet(b, a);
}
@Override
public Oaf finish(Oaf j) {
return j;
}
@Override
public Encoder<Oaf> bufferEncoder() {
return Encoders.kryo(Oaf.class);
}
@Override
public Encoder<Oaf> outputEncoder() {
return Encoders.kryo(Oaf.class);
}
}
private static <T extends Oaf> Oaf parseEntity(String s) {
String prefix = StringUtils.substringBefore(jPath(ID_JPATH, s), "|");
try {
switch (prefix) {
case "10":
return OBJECT_MAPPER.readValue(s, Datasource.class);
case "20":
return OBJECT_MAPPER.readValue(s, Organization.class);
case "40":
return OBJECT_MAPPER.readValue(s, Project.class);
case "50":
String resultType = jPath("$.resulttype.classid", s);
switch (resultType) {
case "publication":
return OBJECT_MAPPER.readValue(s, Publication.class);
case "dataset":
return OBJECT_MAPPER.readValue(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return OBJECT_MAPPER.readValue(s, Software.class);
case "other":
return OBJECT_MAPPER.readValue(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream()
.filter(p -> !p.contains("relation"))
.collect(Collectors.toList());
}
private static String jPath(final String path, final String json) {
Object o = JsonPath.read(json, path);
if (o instanceof String)
return (String) o;
throw new IllegalStateException(String.format("could not extract '%s' from:\n%s", path, json));
}
}

View File

@ -33,9 +33,9 @@ import scala.Tuple2;
* are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined * are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined
* by eu.dnetlib.dhp.schema.common.ModelSupport#idFn() * by eu.dnetlib.dhp.schema.common.ModelSupport#idFn()
*/ */
public class MergeGraphSparkJob { public class MergeGraphTableSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

View File

@ -4,11 +4,9 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -20,6 +18,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -29,16 +28,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2; import scala.Tuple2;
@ -113,7 +103,7 @@ public class GenerateEntitiesApplication {
inputRdd inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey((o1, o2) -> merge(o1, o2)) .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2))
.map(Tuple2::_2) .map(Tuple2::_2)
.map( .map(
oaf -> oaf.getClass().getSimpleName().toLowerCase() oaf -> oaf.getClass().getSimpleName().toLowerCase()
@ -122,17 +112,6 @@ public class GenerateEntitiesApplication {
.saveAsTextFile(targetPath, GzipCodec.class); .saveAsTextFile(targetPath, GzipCodec.class);
} }
private static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
((OafEntity) o1).mergeFrom((OafEntity) o2);
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
}
private static List<Oaf> convertToListOaf( private static List<Oaf> convertToListOaf(
final String id, final String id,
final String s, final String s,

View File

@ -1,273 +0,0 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {
public static KeyValue keyValue(final String k, final String v) {
final KeyValue kv = new KeyValue();
kv.setKey(k);
kv.setValue(v);
return kv;
}
public static List<KeyValue> listKeyValues(final String... s) {
if (s.length % 2 > 0) {
throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)");
}
final List<KeyValue> list = new ArrayList<>();
for (int i = 0; i < s.length; i += 2) {
list.add(keyValue(s[i], s[i + 1]));
}
return list;
}
public static <T> Field<T> field(final T value, final DataInfo info) {
if (value == null || StringUtils.isBlank(value.toString())) {
return null;
}
final Field<T> field = new Field<>();
field.setValue(value);
field.setDataInfo(info);
return field;
}
public static List<Field<String>> listFields(final DataInfo info, final String... values) {
return Arrays
.stream(values)
.map(v -> field(v, info))
.filter(Objects::nonNull)
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
public static List<Field<String>> listFields(final DataInfo info, final List<String> values) {
return values
.stream()
.map(v -> field(v, info))
.filter(Objects::nonNull)
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
public static Qualifier unknown(final String schemeid, final String schemename) {
return qualifier("UNKNOWN", "Unknown", schemeid, schemename);
}
public static Qualifier qualifier(
final String classid,
final String classname,
final String schemeid,
final String schemename) {
final Qualifier q = new Qualifier();
q.setClassid(classid);
q.setClassname(classname);
q.setSchemeid(schemeid);
q.setSchemename(schemename);
return q;
}
public static StructuredProperty structuredProperty(
final String value,
final String classid,
final String classname,
final String schemeid,
final String schemename,
final DataInfo dataInfo) {
return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
}
public static StructuredProperty structuredProperty(
final String value,
final Qualifier qualifier,
final DataInfo dataInfo) {
if (value == null) {
return null;
}
final StructuredProperty sp = new StructuredProperty();
sp.setValue(value);
sp.setQualifier(qualifier);
sp.setDataInfo(dataInfo);
return sp;
}
public static ExtraInfo extraInfo(
final String name,
final String value,
final String typology,
final String provenance,
final String trust) {
final ExtraInfo info = new ExtraInfo();
info.setName(name);
info.setValue(value);
info.setTypology(typology);
info.setProvenance(provenance);
info.setTrust(trust);
return info;
}
public static OAIProvenance oaiIProvenance(
final String identifier,
final String baseURL,
final String metadataNamespace,
final Boolean altered,
final String datestamp,
final String harvestDate) {
final OriginDescription desc = new OriginDescription();
desc.setIdentifier(identifier);
desc.setBaseURL(baseURL);
desc.setMetadataNamespace(metadataNamespace);
desc.setAltered(altered);
desc.setDatestamp(datestamp);
desc.setHarvestDate(harvestDate);
final OAIProvenance p = new OAIProvenance();
p.setOriginDescription(desc);
return p;
}
public static Journal journal(
final String name,
final String issnPrinted,
final String issnOnline,
final String issnLinking,
final DataInfo dataInfo) {
return journal(
name,
issnPrinted,
issnOnline,
issnLinking,
null,
null,
null,
null,
null,
null,
null,
dataInfo);
}
public static Journal journal(
final String name,
final String issnPrinted,
final String issnOnline,
final String issnLinking,
final String ep,
final String iss,
final String sp,
final String vol,
final String edition,
final String conferenceplace,
final String conferencedate,
final DataInfo dataInfo) {
if (StringUtils.isNotBlank(name)
|| StringUtils.isNotBlank(issnPrinted)
|| StringUtils.isNotBlank(issnOnline)
|| StringUtils.isNotBlank(issnLinking)) {
final Journal j = new Journal();
j.setName(name);
j.setIssnPrinted(issnPrinted);
j.setIssnOnline(issnOnline);
j.setIssnLinking(issnLinking);
j.setEp(ep);
j.setIss(iss);
j.setSp(sp);
j.setVol(vol);
j.setEdition(edition);
j.setConferenceplace(conferenceplace);
j.setConferencedate(conferencedate);
j.setDataInfo(dataInfo);
return j;
} else {
return null;
}
}
public static DataInfo dataInfo(
final Boolean deletedbyinference,
final String inferenceprovenance,
final Boolean inferred,
final Boolean invisible,
final Qualifier provenanceaction,
final String trust) {
final DataInfo d = new DataInfo();
d.setDeletedbyinference(deletedbyinference);
d.setInferenceprovenance(inferenceprovenance);
d.setInferred(inferred);
d.setInvisible(invisible);
d.setProvenanceaction(provenanceaction);
d.setTrust(trust);
return d;
}
public static String createOpenaireId(
final int prefix,
final String originalId,
final boolean to_md5) {
if (StringUtils.isBlank(originalId)) {
return null;
} else if (to_md5) {
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
final String rest = StringUtils.substringAfter(originalId, "::");
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
} else {
return String.format("%s|%s", prefix, originalId);
}
}
public static String createOpenaireId(
final String type,
final String originalId,
final boolean to_md5) {
switch (type) {
case "datasource":
return createOpenaireId(10, originalId, to_md5);
case "organization":
return createOpenaireId(20, originalId, to_md5);
case "person":
return createOpenaireId(30, originalId, to_md5);
case "project":
return createOpenaireId(40, originalId, to_md5);
default:
return createOpenaireId(50, originalId, to_md5);
}
}
public static String asString(final Object o) {
return o == null ? "" : o.toString();
}
public static <T> Predicate<T> distinctByKey(
final Function<? super T, ?> keyExtractor) {
final Map<Object, Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "g",
"paramLongName": "graphRawPath",
"paramDescription": "the path of the graph Raw in hdfs",
"paramRequired": true
}
]

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "gin",
"paramLongName": "graphInputPath",
"paramDescription": "the graph root path",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramRequired": true
}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,271 @@
<workflow-app name="fuse graph entities by id" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphInputPath</name>
<description>the graph root input path</description>
</property>
<property>
<name>outputPath</name>
<description>the graph root output path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="fuse_graph"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="fuse_graph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fuse graph entities by ID</name>
<class>eu.dnetlib.dhp.oa.graph.fuse.FuseGraphResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--graphInputPath</arg><arg>${graphInputPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/entities</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasets</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasources</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge organizations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge projects</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge relations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<join name="wait_merge" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -76,7 +76,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge publications</name> <name>Merge publications</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -103,7 +103,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge datasets</name> <name>Merge datasets</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -130,7 +130,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge otherresearchproducts</name> <name>Merge otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -157,7 +157,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge softwares</name> <name>Merge softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -184,7 +184,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge datasources</name> <name>Merge datasources</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -211,7 +211,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge organizations</name> <name>Merge organizations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -238,7 +238,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge projects</name> <name>Merge projects</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -265,7 +265,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge relations</name> <name>Merge relations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}

View File

@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
public class MergeGraphSparkJobTest { public class MergeGraphTableSparkJobTest {
private ObjectMapper mapper; private ObjectMapper mapper;
@ -28,7 +28,7 @@ public class MergeGraphSparkJobTest {
public void testMergeDatasources() throws IOException { public void testMergeDatasources() throws IOException {
assertEquals( assertEquals(
"openaire-cris_1.1", "openaire-cris_1.1",
MergeGraphSparkJob MergeGraphTableSparkJob
.mergeDatasource( .mergeDatasource(
d("datasource_cris.json"), d("datasource_cris.json"),
d("datasource_UNKNOWN.json")) d("datasource_UNKNOWN.json"))
@ -36,7 +36,7 @@ public class MergeGraphSparkJobTest {
.getClassid()); .getClassid());
assertEquals( assertEquals(
"openaire-cris_1.1", "openaire-cris_1.1",
MergeGraphSparkJob MergeGraphTableSparkJob
.mergeDatasource( .mergeDatasource(
d("datasource_UNKNOWN.json"), d("datasource_UNKNOWN.json"),
d("datasource_cris.json")) d("datasource_cris.json"))
@ -44,7 +44,7 @@ public class MergeGraphSparkJobTest {
.getClassid()); .getClassid());
assertEquals( assertEquals(
"driver-openaire2.0", "driver-openaire2.0",
MergeGraphSparkJob MergeGraphTableSparkJob
.mergeDatasource( .mergeDatasource(
d("datasource_native.json"), d("datasource_native.json"),
d("datasource_driver-openaire2.0.json")) d("datasource_driver-openaire2.0.json"))
@ -52,7 +52,7 @@ public class MergeGraphSparkJobTest {
.getClassid()); .getClassid());
assertEquals( assertEquals(
"driver-openaire2.0", "driver-openaire2.0",
MergeGraphSparkJob MergeGraphTableSparkJob
.mergeDatasource( .mergeDatasource(
d("datasource_driver-openaire2.0.json"), d("datasource_driver-openaire2.0.json"),
d("datasource_native.json")) d("datasource_native.json"))
@ -60,7 +60,7 @@ public class MergeGraphSparkJobTest {
.getClassid()); .getClassid());
assertEquals( assertEquals(
"openaire4.0", "openaire4.0",
MergeGraphSparkJob MergeGraphTableSparkJob
.mergeDatasource( .mergeDatasource(
d("datasource_notCompatible.json"), d("datasource_notCompatible.json"),
d("datasource_openaire4.0.json")) d("datasource_openaire4.0.json"))
@ -68,7 +68,7 @@ public class MergeGraphSparkJobTest {
.getClassid()); .getClassid());
assertEquals( assertEquals(
"notCompatible", "notCompatible",
MergeGraphSparkJob MergeGraphTableSparkJob
.mergeDatasource( .mergeDatasource(
d("datasource_notCompatible.json"), d("datasource_notCompatible.json"),
d("datasource_UNKNOWN.json")) d("datasource_UNKNOWN.json"))

View File

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class GenerateEntitiesApplicationTest {
@Mock
private ISLookUpService isLookUpService;
@Mock
private VocabularyGroup vocs;
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
lenient()
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.thenReturn(synonyms());
vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
}
@Test
public void testMergeResult() throws IOException {
Result publication = getResult("oaf_record.xml", Publication.class);
Result dataset = getResult("odf_dataset.xml", Dataset.class);
Result software = getResult("odf_software.xml", Software.class);
Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class);
verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
}
protected <T extends Result> void verifyMerge(Result publication, Result dataset, Class<T> clazz,
String resultType) {
final Result merge = OafMapperUtils.mergeResults(publication, dataset);
assertTrue(clazz.isAssignableFrom(merge.getClass()));
assertEquals(resultType, merge.getResulttype().getClassid());
}
protected <T extends Result> Result getResult(String xmlFileName, Class<T> clazz) throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName));
return new OdfToOafMapper(vocs, false)
.processMdRecord(xml)
.stream()
.filter(s -> clazz.isAssignableFrom(s.getClass()))
.map(s -> (Result) s)
.findFirst()
.get();
}
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
}
private List<String> synonyms() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
}
}