WIP: added oozie workflow for grouping graph entities by id

This commit is contained in:
Claudio Atzori 2020-11-13 10:05:12 +01:00
parent 13e36a4da0
commit 2bed29eb09
19 changed files with 578 additions and 211 deletions

View File

@ -1,2 +1,297 @@
package eu.dnetlib.dhp.schema.oaf;public class OafMapperUtils {
package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {
public static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
if (ModelSupport.isSubClass(o1, Result.class)) {
return mergeResults((Result) o1, (Result) o2);
} else if (ModelSupport.isSubClass(o1, Datasource.class)) {
((Datasource) o1).mergeFrom((Datasource) o2);
} else if (ModelSupport.isSubClass(o1, Organization.class)) {
((Organization) o1).mergeFrom((Organization) o2);
} else if (ModelSupport.isSubClass(o1, Project.class)) {
((Project) o1).mergeFrom((Project) o2);
} else {
throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName());
}
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
}
public static Result mergeResults(Result r1, Result r2) {
if (new ResultTypeComparator().compare(r1, r2) < 0) {
r1.mergeFrom(r2);
return r1;
} else {
r2.mergeFrom(r1);
return r2;
}
}
public static KeyValue keyValue(final String k, final String v) {
final KeyValue kv = new KeyValue();
kv.setKey(k);
kv.setValue(v);
return kv;
}
public static List<KeyValue> listKeyValues(final String... s) {
if (s.length % 2 > 0) {
throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)");
}
final List<KeyValue> list = new ArrayList<>();
for (int i = 0; i < s.length; i += 2) {
list.add(keyValue(s[i], s[i + 1]));
}
return list;
}
public static <T> Field<T> field(final T value, final DataInfo info) {
if (value == null || StringUtils.isBlank(value.toString())) {
return null;
}
final Field<T> field = new Field<>();
field.setValue(value);
field.setDataInfo(info);
return field;
}
public static List<Field<String>> listFields(final DataInfo info, final String... values) {
return Arrays
.stream(values)
.map(v -> field(v, info))
.filter(Objects::nonNull)
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
public static List<Field<String>> listFields(final DataInfo info, final List<String> values) {
return values
.stream()
.map(v -> field(v, info))
.filter(Objects::nonNull)
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
public static Qualifier unknown(final String schemeid, final String schemename) {
return qualifier("UNKNOWN", "Unknown", schemeid, schemename);
}
public static Qualifier qualifier(
final String classid,
final String classname,
final String schemeid,
final String schemename) {
final Qualifier q = new Qualifier();
q.setClassid(classid);
q.setClassname(classname);
q.setSchemeid(schemeid);
q.setSchemename(schemename);
return q;
}
public static StructuredProperty structuredProperty(
final String value,
final String classid,
final String classname,
final String schemeid,
final String schemename,
final DataInfo dataInfo) {
return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
}
public static StructuredProperty structuredProperty(
final String value,
final Qualifier qualifier,
final DataInfo dataInfo) {
if (value == null) {
return null;
}
final StructuredProperty sp = new StructuredProperty();
sp.setValue(value);
sp.setQualifier(qualifier);
sp.setDataInfo(dataInfo);
return sp;
}
public static ExtraInfo extraInfo(
final String name,
final String value,
final String typology,
final String provenance,
final String trust) {
final ExtraInfo info = new ExtraInfo();
info.setName(name);
info.setValue(value);
info.setTypology(typology);
info.setProvenance(provenance);
info.setTrust(trust);
return info;
}
public static OAIProvenance oaiIProvenance(
final String identifier,
final String baseURL,
final String metadataNamespace,
final Boolean altered,
final String datestamp,
final String harvestDate) {
final OriginDescription desc = new OriginDescription();
desc.setIdentifier(identifier);
desc.setBaseURL(baseURL);
desc.setMetadataNamespace(metadataNamespace);
desc.setAltered(altered);
desc.setDatestamp(datestamp);
desc.setHarvestDate(harvestDate);
final OAIProvenance p = new OAIProvenance();
p.setOriginDescription(desc);
return p;
}
public static Journal journal(
final String name,
final String issnPrinted,
final String issnOnline,
final String issnLinking,
final DataInfo dataInfo) {
return journal(
name,
issnPrinted,
issnOnline,
issnLinking,
null,
null,
null,
null,
null,
null,
null,
dataInfo);
}
public static Journal journal(
final String name,
final String issnPrinted,
final String issnOnline,
final String issnLinking,
final String ep,
final String iss,
final String sp,
final String vol,
final String edition,
final String conferenceplace,
final String conferencedate,
final DataInfo dataInfo) {
if (StringUtils.isNotBlank(name)
|| StringUtils.isNotBlank(issnPrinted)
|| StringUtils.isNotBlank(issnOnline)
|| StringUtils.isNotBlank(issnLinking)) {
final Journal j = new Journal();
j.setName(name);
j.setIssnPrinted(issnPrinted);
j.setIssnOnline(issnOnline);
j.setIssnLinking(issnLinking);
j.setEp(ep);
j.setIss(iss);
j.setSp(sp);
j.setVol(vol);
j.setEdition(edition);
j.setConferenceplace(conferenceplace);
j.setConferencedate(conferencedate);
j.setDataInfo(dataInfo);
return j;
} else {
return null;
}
}
public static DataInfo dataInfo(
final Boolean deletedbyinference,
final String inferenceprovenance,
final Boolean inferred,
final Boolean invisible,
final Qualifier provenanceaction,
final String trust) {
final DataInfo d = new DataInfo();
d.setDeletedbyinference(deletedbyinference);
d.setInferenceprovenance(inferenceprovenance);
d.setInferred(inferred);
d.setInvisible(invisible);
d.setProvenanceaction(provenanceaction);
d.setTrust(trust);
return d;
}
public static String createOpenaireId(
final int prefix,
final String originalId,
final boolean to_md5) {
if (StringUtils.isBlank(originalId)) {
return null;
} else if (to_md5) {
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
final String rest = StringUtils.substringAfter(originalId, "::");
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
} else {
return String.format("%s|%s", prefix, originalId);
}
}
public static String createOpenaireId(
final String type,
final String originalId,
final boolean to_md5) {
switch (type) {
case "datasource":
return createOpenaireId(10, originalId, to_md5);
case "organization":
return createOpenaireId(20, originalId, to_md5);
case "person":
return createOpenaireId(30, originalId, to_md5);
case "project":
return createOpenaireId(40, originalId, to_md5);
default:
return createOpenaireId(50, originalId, to_md5);
}
}
public static String asString(final Object o) {
return o == null ? "" : o.toString();
}
public static <T> Predicate<T> distinctByKey(
final Function<? super T, ?> keyExtractor) {
final Map<Object, Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}
} }

View File

@ -1,2 +1,49 @@
package eu.dnetlib.dhp.schema.oaf;public class ResultTypeComparator {
package eu.dnetlib.dhp.schema.oaf;
import java.util.Comparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class ResultTypeComparator implements Comparator<Result> {
@Override
public int compare(Result left, Result right) {
if (left == null && right == null)
return 0;
if (left == null)
return 1;
if (right == null)
return -1;
String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid();
if (lClass.equals(rClass))
return 0;
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1;
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
} }

View File

@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.List;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
@ -15,9 +16,15 @@ import org.apache.commons.codec.binary.Hex;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray; import net.minidev.json.JSONArray;
import scala.collection.JavaConverters;
import scala.collection.Seq;
public class DHPUtils { public class DHPUtils {
public static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
public static String md5(final String s) { public static String md5(final String s) {
try { try {
final MessageDigest md = MessageDigest.getInstance("MD5"); final MessageDigest md = MessageDigest.getInstance("MD5");

View File

@ -3,13 +3,9 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -23,11 +19,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

View File

@ -11,7 +11,6 @@ import org.apache.commons.lang3.StringUtils;
import com.clearspring.analytics.util.Lists; import com.clearspring.analytics.util.Lists;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;

View File

@ -1,11 +1,10 @@
package eu.dnetlib.dhp.oa.graph.fuse; package eu.dnetlib.dhp.oa.graph.groupbyid;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -17,14 +16,22 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import scala.Tuple2;
public class DispatchEntitiesSparkJob { public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
@ -40,48 +47,51 @@ public class DispatchEntitiesSparkJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String sourcePath = parser.get("sourcePath"); final String entitiesPath = parser.get("entitiesPath");
final String targetPath = parser.get("graphRawPath"); log.info("entitiesPath: {}", entitiesPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, targetPath); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
ModelSupport.oafTypes dispatchOaf(spark, entityClazz, entitiesPath, outputPath);
.values()
.forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath));
}); });
} }
private static <T extends Oaf> void processEntity( private static <T extends Oaf> void dispatchOaf(
final SparkSession spark, final SparkSession spark,
final Class<T> clazz, final Class<T> clazz,
final String sourcePath, final String sourcePath,
final String targetPath) { final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase();
log.info("Processing entities ({}) in file: {}", type, sourcePath); log.info("Processing entities ({}) in file: {}", clazz.getName(), sourcePath);
spark spark
.read() .read()
.textFile(sourcePath) .textFile(sourcePath)
.filter((FilterFunction<String>) value -> isEntityType(value, type)) .filter((FilterFunction<String>) s -> isEntityType(s, clazz))
.map( .map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
(MapFunction<String, String>) l -> StringUtils.substringAfter(l, "|"),
Encoders.STRING())
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.text(targetPath + "/" + type); .text(targetPath);
} }
private static boolean isEntityType(final String line, final String type) { private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); return StringUtils.substringBefore(s, "|").equals(clazz.getName());
} }
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
} }

View File

@ -12,9 +12,9 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
@ -22,7 +22,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -33,19 +36,21 @@ import scala.Tuple2;
/** /**
* Groups the graph content by entity identifier to ensure ID uniqueness * Groups the graph content by entity identifier to ensure ID uniqueness
*/ */
public class GroupEntitiesSparkJob { public class GroupEntitiesAndRelationsSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class);
private final static String ID_JPATH = "$.id"; private final static String ID_JPATH = "$.id";
private final static String SOURCE_JPATH = "$.source";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
GroupEntitiesSparkJob.class GroupEntitiesAndRelationsSparkJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json")); "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
@ -72,11 +77,11 @@ public class GroupEntitiesSparkJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath); groupEntitiesAndRelations(spark, graphInputPath, outputPath);
}); });
} }
private static void groupEntities( private static void groupEntitiesAndRelations(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String outputPath) { String outputPath) {
@ -85,14 +90,19 @@ public class GroupEntitiesSparkJob {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark spark
.read() .read()
.textFile(toSeq(listEntityPaths(inputPath, sc))) .textFile(toSeq(listPaths(inputPath, sc)))
.map((MapFunction<String, Oaf>) s -> parseEntity(s), Encoders.kryo(Oaf.class)) .map((MapFunction<String, Oaf>) s -> parseOaf(s), Encoders.kryo(Oaf.class))
.filter((FilterFunction<Oaf>) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf)))
.groupByKey((MapFunction<Oaf, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) .groupByKey((MapFunction<Oaf, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
.agg(aggregator) .agg(aggregator)
.map((MapFunction<Tuple2<String, Oaf>, Oaf>) Tuple2::_2, Encoders.kryo(Oaf.class)) .map(
(MapFunction<Tuple2<String, Oaf>, String>) t -> t._2().getClass().getName() +
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
Encoders.STRING())
.write() .write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(outputPath); .text(outputPath);
} }
public static class GroupingAggregator extends Aggregator<Oaf, Oaf, Oaf> { public static class GroupingAggregator extends Aggregator<Oaf, Oaf, Oaf> {
@ -136,51 +146,61 @@ public class GroupEntitiesSparkJob {
} }
private static <T extends Oaf> Oaf parseEntity(String s) { private static Oaf parseOaf(String s) {
String prefix = StringUtils.substringBefore(jPath(ID_JPATH, s), "|");
try { DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) { switch (prefix) {
case "10": case "10":
return OBJECT_MAPPER.readValue(s, Datasource.class); return parse(s, Datasource.class);
case "20": case "20":
return OBJECT_MAPPER.readValue(s, Organization.class); return parse(s, Organization.class);
case "40": case "40":
return OBJECT_MAPPER.readValue(s, Project.class); return parse(s, Project.class);
case "50": case "50":
String resultType = jPath("$.resulttype.classid", s); String resultType = dc.read("$.resulttype.classid");
switch (resultType) { switch (resultType) {
case "publication": case "publication":
return OBJECT_MAPPER.readValue(s, Publication.class); return parse(s, Publication.class);
case "dataset": case "dataset":
return OBJECT_MAPPER.readValue(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software": case "software":
return OBJECT_MAPPER.readValue(s, Software.class); return parse(s, Software.class);
case "other": case "other":
return OBJECT_MAPPER.readValue(s, OtherResearchProduct.class); return parse(s, OtherResearchProduct.class);
default: default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
} }
default: default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
} }
} else {
String source = dc.read(SOURCE_JPATH);
if (StringUtils.isNotBlank(source)) {
return parse(s, Relation.class);
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
}
private static <T extends Oaf> Oaf parse(String s, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) { private static List<String> listPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration()) .listFiles(inputPath, sc.hadoopConfiguration())
.stream() .stream()
.filter(p -> !p.contains("relation"))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static String jPath(final String path, final String json) {
Object o = JsonPath.read(json, path);
if (o instanceof String)
return (String) o;
throw new IllegalStateException(String.format("could not extract '%s' from:\n%s", path, json));
}
} }

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.util.*; import java.util.*;

View File

@ -4,9 +4,11 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -18,7 +20,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,7 +69,7 @@ public class GenerateEntitiesApplication {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, targetPath); HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration());
generateEntities(spark, vocs, sourcePaths, targetPath); generateEntities(spark, vocs, sourcePaths, targetPath);
}); });
} }
@ -82,7 +83,7 @@ public class GenerateEntitiesApplication {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final List<String> existingSourcePaths = Arrays final List<String> existingSourcePaths = Arrays
.stream(sourcePaths.split(",")) .stream(sourcePaths.split(","))
.filter(p -> exists(sc, p)) .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.collect(Collectors.toList()); .collect(Collectors.toList());
log.info("Generate entities from files:"); log.info("Generate entities from files:");
@ -160,17 +161,4 @@ public class GenerateEntitiesApplication {
} }
} }
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
try {
final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration());
final Path path = new Path(pathToFile);
return hdfs.exists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
} }

View File

@ -1,15 +1,6 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@ -32,6 +23,7 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT; import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM; import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw; package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;

View File

@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Vocabulary implements Serializable { public class Vocabulary implements Serializable {

View File

@ -7,6 +7,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

View File

@ -6,15 +6,21 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "s", "paramName": "ep",
"paramLongName": "sourcePath", "paramLongName": "entitiesPath",
"paramDescription": "the source path", "paramDescription": "the entities path",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "g", "paramName": "g",
"paramLongName": "graphRawPath", "paramLongName": "outputPath",
"paramDescription": "the path of the graph Raw in hdfs", "paramDescription": "the output path to store the dispatched entities",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name modelling the graph table",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -1,4 +1,4 @@
<workflow-app name="fuse graph entities by id" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="group graph entities and relations" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
@ -46,18 +46,18 @@
</property> </property>
</parameters> </parameters>
<start to="fuse_graph"/> <start to="group_entities"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="fuse_graph"> <action name="group_entities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Fuse graph entities by ID</name> <name>group graph entities and relations</name>
<class>eu.dnetlib.dhp.oa.graph.fuse.FuseGraphResultsSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.GroupEntitiesAndRelationsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -70,18 +70,29 @@
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphInputPath}</arg> <arg>--graphInputPath</arg><arg>${graphInputPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/entities</arg> <arg>--outputPath</arg><arg>${workingDir}/grouped_entities</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="fork_dispatch_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="merge_dataset"> <fork name="fork_dispatch_entities">
<path start="dispatch_publication"/>
<path start="dispatch_dataset"/>
<path start="dispatch_software"/>
<path start="dispatch_otherresearchproduct"/>
<path start="dispatch_datasource"/>
<path start="dispatch_organization"/>
<path start="dispatch_project"/>
<path start="dispatch_relation"/>
</fork>
<action name="dispatch_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge datasets</name> <name>Dispatch publications</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -93,22 +104,45 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg> <arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg> <arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch datasets</name>
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_dispatch"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="merge_otherresearchproduct"> <action name="dispatch_software">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge otherresearchproducts</name> <name>Dispatch softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -120,49 +154,20 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg> <arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Merge softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_dispatch"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="merge_datasource"> <action name="dispatch_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge datasources</name> <name>Dispatch otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -174,22 +179,45 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg> <arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg> <arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch datasources</name>
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${outputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_dispatch"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="merge_organization"> <action name="dispatch_organization">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge organizations</name> <name>Dispatch organizations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -201,22 +229,20 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg> <arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg> <arg>--outputPath</arg><arg>${outputPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_dispatch"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="merge_project"> <action name="dispatch_project">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge projects</name> <name>Dispatch project</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -228,22 +254,20 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg> <arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg> <arg>--outputPath</arg><arg>${outputPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_dispatch"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="merge_relation"> <action name="dispatch_relation">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Merge relations</name> <name>Dispatch relations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class> <class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -255,17 +279,15 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg> <arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg>
</spark> </spark>
<ok to="wait_merge"/> <ok to="wait_dispatch"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_merge" to="End"/> <join name="wait_dispatch" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -2,11 +2,11 @@
<parameters> <parameters>
<property> <property>
<name>betaInputGgraphPath</name> <name>betaInputGraphPath</name>
<description>the beta graph root path</description> <description>the beta graph root path</description>
</property> </property>
<property> <property>
<name>prodInputGgraphPath</name> <name>prodInputGraphPath</name>
<description>the production graph root path</description> <description>the production graph root path</description>
</property> </property>
<property> <property>

View File

@ -27,14 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class MigrateDbEntitiesApplicationTest { public class MigrateDbEntitiesApplicationTest {

View File

@ -2,12 +2,11 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -28,13 +27,11 @@ import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2; import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/** /**
* XmlConverterJob converts the JoinedEntities as XML records * XmlConverterJob converts the JoinedEntities as XML records
@ -43,8 +40,6 @@ public class XmlConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -129,10 +124,6 @@ public class XmlConverterJob {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) { private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) {
Map<String, LongAccumulator> accumulators = Maps.newHashMap(); Map<String, LongAccumulator> accumulators = Maps.newHashMap();
accumulators accumulators