small adjustments in the provisioning workflow

This commit is contained in:
Claudio Atzori 2020-04-21 16:15:04 +02:00
parent 88fbb3a353
commit 0b55795d4d
4 changed files with 89 additions and 112 deletions

View File

@ -11,11 +11,13 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -104,16 +106,12 @@ public class CreateRelatedEntitiesJob_phase1 {
SparkSession spark, SparkSession spark,
String inputRelationsPath, String inputRelationsPath,
String inputEntityPath, String inputEntityPath,
Class<E> entityClazz, Class<E> clazz,
String outputPath) { String outputPath) {
Dataset<Tuple2<String, SortableRelation>> relsByTarget = Dataset<Tuple2<String, SortableRelation>> relsByTarget =
readPathRelation(spark, inputRelationsPath) readPathRelation(spark, inputRelationsPath)
.filter( .filter("dataInfo.deletedbyinference == false")
(FilterFunction<SortableRelation>)
value ->
value.getDataInfo().getDeletedbyinference()
== false)
.map( .map(
(MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) (MapFunction<SortableRelation, Tuple2<String, SortableRelation>>)
r -> new Tuple2<>(r.getTarget(), r), r -> new Tuple2<>(r.getTarget(), r),
@ -122,10 +120,11 @@ public class CreateRelatedEntitiesJob_phase1 {
.cache(); .cache();
Dataset<Tuple2<String, RelatedEntity>> entities = Dataset<Tuple2<String, RelatedEntity>> entities =
readPathEntity(spark, inputEntityPath, entityClazz) readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, RelatedEntity>) (MapFunction<E, RelatedEntity>)
value -> asRelatedEntity(value, entityClazz), value -> asRelatedEntity(value, clazz),
Encoders.bean(RelatedEntity.class)) Encoders.bean(RelatedEntity.class))
.map( .map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) (MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>)
@ -146,7 +145,7 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.bean(EntityRelEntity.class)) Encoders.bean(EntityRelEntity.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath + "/" + EntityType.fromClass(entityClazz)); .parquet(outputPath + "/" + EntityType.fromClass(clazz));
} }
private static <E extends OafEntity> Dataset<E> readPathEntity( private static <E extends OafEntity> Dataset<E> readPathEntity(
@ -161,6 +160,81 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.bean(entityClazz)); Encoders.bean(entityClazz));
} }
public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) {
final RelatedEntity re = new RelatedEntity();
re.setId(entity.getId());
re.setType(EntityType.fromClass(clazz).name());
re.setPid(entity.getPid());
re.setCollectedfrom(entity.getCollectedfrom());
switch (EntityType.fromClass(clazz)) {
case publication:
case dataset:
case otherresearchproduct:
case software:
Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
re.setTitle(result.getTitle().stream().findFirst().get());
}
re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype());
re.setInstances(result.getInstance());
// TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
break;
case datasource:
Datasource d = (Datasource) entity;
re.setOfficialname(getValue(d.getOfficialname()));
re.setWebsiteurl(getValue(d.getWebsiteurl()));
re.setDatasourcetype(d.getDatasourcetype());
re.setOpenairecompatibility(d.getOpenairecompatibility());
break;
case organization:
Organization o = (Organization) entity;
re.setLegalname(getValue(o.getLegalname()));
re.setLegalshortname(getValue(o.getLegalshortname()));
re.setCountry(o.getCountry());
re.setWebsiteurl(getValue(o.getWebsiteurl()));
break;
case project:
Project p = (Project) entity;
re.setProjectTitle(getValue(p.getTitle()));
re.setCode(getValue(p.getCode()));
re.setAcronym(getValue(p.getAcronym()));
re.setContracttype(p.getContracttype());
List<Field<String>> f = p.getFundingtree();
if (!f.isEmpty()) {
re.setFundingtree(
f.stream().map(s -> s.getValue()).collect(Collectors.toList()));
}
break;
}
return re;
}
private static String getValue(Field<String> field) {
return getFieldValueWithDefault(field, "");
}
private static <T> T getFieldValueWithDefault(Field<T> f, T defaultValue) {
return Optional.ofNullable(f)
.filter(Objects::nonNull)
.map(x -> x.getValue())
.orElse(defaultValue);
}
/** /**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline
* delimited json text file, * delimited json text file,

View File

@ -76,9 +76,6 @@ public class PrepareRelationsJob {
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
int numPartitions = Integer.parseInt(parser.get("relPartitions"));
log.info("relPartitions: {}", numPartitions);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -86,27 +83,14 @@ public class PrepareRelationsJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions); prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
}); });
} }
private static void prepareRelationsFromPaths( private static void prepareRelationsFromPaths(
SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { SparkSession spark, String inputRelationsPath, String outputPath) {
readPathRelation(spark, inputRelationsPath) readPathRelation(spark, inputRelationsPath)
.filter( .filter("dataInfo.deletedbyinference == false")
(FilterFunction<SortableRelation>)
r -> {
try {
return r != null
&& r.getDataInfo() != null
&& !r.getDataInfo().getDeletedbyinference();
} catch (NullPointerException e) {
log.info(
"invalid NPE '{}'",
OBJECT_MAPPER.writeValueAsString(r));
throw e;
}
})
.groupByKey( .groupByKey(
(MapFunction<SortableRelation, String>) value -> value.getSource(), (MapFunction<SortableRelation, String>) value -> value.getSource(),
Encoders.STRING()) Encoders.STRING())
@ -114,7 +98,6 @@ public class PrepareRelationsJob {
(FlatMapGroupsFunction<String, SortableRelation, SortableRelation>) (FlatMapGroupsFunction<String, SortableRelation, SortableRelation>)
(key, values) -> Iterators.limit(values, MAX_RELS), (key, values) -> Iterators.limit(values, MAX_RELS),
Encoders.bean(SortableRelation.class)) Encoders.bean(SortableRelation.class))
.repartition(numPartitions)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);

View File

@ -3,14 +3,8 @@ package eu.dnetlib.dhp.oa.provision.utils;
import static org.apache.commons.lang3.StringUtils.substringAfter; import static org.apache.commons.lang3.StringUtils.substringAfter;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
public class GraphMappingUtils { public class GraphMappingUtils {
@ -18,81 +12,6 @@ public class GraphMappingUtils {
public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier"); public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) {
final RelatedEntity re = new RelatedEntity();
re.setId(entity.getId());
re.setType(EntityType.fromClass(clazz).name());
re.setPid(entity.getPid());
re.setCollectedfrom(entity.getCollectedfrom());
switch (EntityType.fromClass(clazz)) {
case publication:
case dataset:
case otherresearchproduct:
case software:
Result result = (Result) entity;
if (result.getTitle() == null && !result.getTitle().isEmpty()) {
re.setTitle(result.getTitle().stream().findFirst().get());
}
re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype());
re.setInstances(result.getInstance());
// TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
break;
case datasource:
Datasource d = (Datasource) entity;
re.setOfficialname(getValue(d.getOfficialname()));
re.setWebsiteurl(getValue(d.getWebsiteurl()));
re.setDatasourcetype(d.getDatasourcetype());
re.setOpenairecompatibility(d.getOpenairecompatibility());
break;
case organization:
Organization o = (Organization) entity;
re.setLegalname(getValue(o.getLegalname()));
re.setLegalshortname(getValue(o.getLegalshortname()));
re.setCountry(o.getCountry());
re.setWebsiteurl(getValue(o.getWebsiteurl()));
break;
case project:
Project p = (Project) entity;
re.setProjectTitle(getValue(p.getTitle()));
re.setCode(getValue(p.getCode()));
re.setAcronym(getValue(p.getAcronym()));
re.setContracttype(p.getContracttype());
List<Field<String>> f = p.getFundingtree();
if (!f.isEmpty()) {
re.setFundingtree(
f.stream().map(s -> s.getValue()).collect(Collectors.toList()));
}
break;
}
return re;
}
private static String getValue(Field<String> field) {
return getFieldValueWithDefault(field, "");
}
private static <T> T getFieldValueWithDefault(Field<T> f, T defaultValue) {
return Optional.ofNullable(f)
.filter(Objects::nonNull)
.map(x -> x.getValue())
.orElse(defaultValue);
}
public static String removePrefix(final String s) { public static String removePrefix(final String s) {
if (s.contains("|")) return substringAfter(s, "|"); if (s.contains("|")) return substringAfter(s, "|");
return s; return s;

View File

@ -98,6 +98,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>