diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 6b1dd52ce..df5b8779a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -11,11 +11,13 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.*; +import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -104,16 +106,12 @@ public class CreateRelatedEntitiesJob_phase1 { SparkSession spark, String inputRelationsPath, String inputEntityPath, - Class entityClazz, + Class clazz, String outputPath) { Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) - .filter( - (FilterFunction) - value -> - value.getDataInfo().getDeletedbyinference() - == false) + .filter("dataInfo.deletedbyinference == false") .map( (MapFunction>) r -> new Tuple2<>(r.getTarget(), r), @@ -122,10 +120,11 @@ public class CreateRelatedEntitiesJob_phase1 { .cache(); Dataset> entities = - readPathEntity(spark, inputEntityPath, entityClazz) + readPathEntity(spark, inputEntityPath, clazz) + .filter("dataInfo.invisible == false") .map( (MapFunction) - value -> asRelatedEntity(value, entityClazz), + value -> asRelatedEntity(value, clazz), Encoders.bean(RelatedEntity.class)) .map( (MapFunction>) @@ -146,7 +145,7 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.bean(EntityRelEntity.class)) .write() .mode(SaveMode.Overwrite) - .parquet(outputPath + "/" + EntityType.fromClass(entityClazz)); + .parquet(outputPath + "/" + EntityType.fromClass(clazz)); } private static Dataset readPathEntity( @@ -161,6 +160,81 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.bean(entityClazz)); } + public static RelatedEntity asRelatedEntity(E entity, Class clazz) { + + final RelatedEntity re = new RelatedEntity(); + re.setId(entity.getId()); + re.setType(EntityType.fromClass(clazz).name()); + + re.setPid(entity.getPid()); + re.setCollectedfrom(entity.getCollectedfrom()); + + switch (EntityType.fromClass(clazz)) { + case publication: + case dataset: + case otherresearchproduct: + case software: + Result result = (Result) entity; + + if (result.getTitle() != null && !result.getTitle().isEmpty()) { + re.setTitle(result.getTitle().stream().findFirst().get()); + } + + re.setDateofacceptance(getValue(result.getDateofacceptance())); + re.setPublisher(getValue(result.getPublisher())); + re.setResulttype(result.getResulttype()); + re.setInstances(result.getInstance()); + + // TODO still to be mapped + // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); + + break; + case datasource: + Datasource d = (Datasource) entity; + + re.setOfficialname(getValue(d.getOfficialname())); + re.setWebsiteurl(getValue(d.getWebsiteurl())); + re.setDatasourcetype(d.getDatasourcetype()); + re.setOpenairecompatibility(d.getOpenairecompatibility()); + + break; + case organization: + Organization o = (Organization) entity; + + re.setLegalname(getValue(o.getLegalname())); + re.setLegalshortname(getValue(o.getLegalshortname())); + re.setCountry(o.getCountry()); + re.setWebsiteurl(getValue(o.getWebsiteurl())); + break; + case project: + Project p = (Project) entity; + + re.setProjectTitle(getValue(p.getTitle())); + re.setCode(getValue(p.getCode())); + re.setAcronym(getValue(p.getAcronym())); + re.setContracttype(p.getContracttype()); + + List> f = p.getFundingtree(); + if (!f.isEmpty()) { + re.setFundingtree( + f.stream().map(s -> s.getValue()).collect(Collectors.toList())); + } + break; + } + return re; + } + + private static String getValue(Field field) { + return getFieldValueWithDefault(field, ""); + } + + private static T getFieldValueWithDefault(Field f, T defaultValue) { + return Optional.ofNullable(f) + .filter(Objects::nonNull) + .map(x -> x.getValue()) + .orElse(defaultValue); + } + /** * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline * delimited json text file, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index b663a8082..87264df05 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -76,9 +76,6 @@ public class PrepareRelationsJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - int numPartitions = Integer.parseInt(parser.get("relPartitions")); - log.info("relPartitions: {}", numPartitions); - SparkConf conf = new SparkConf(); runWithSparkSession( @@ -86,27 +83,14 @@ public class PrepareRelationsJob { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions); + prepareRelationsFromPaths(spark, inputRelationsPath, outputPath); }); } private static void prepareRelationsFromPaths( - SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { + SparkSession spark, String inputRelationsPath, String outputPath) { readPathRelation(spark, inputRelationsPath) - .filter( - (FilterFunction) - r -> { - try { - return r != null - && r.getDataInfo() != null - && !r.getDataInfo().getDeletedbyinference(); - } catch (NullPointerException e) { - log.info( - "invalid NPE '{}'", - OBJECT_MAPPER.writeValueAsString(r)); - throw e; - } - }) + .filter("dataInfo.deletedbyinference == false") .groupByKey( (MapFunction) value -> value.getSource(), Encoders.STRING()) @@ -114,7 +98,6 @@ public class PrepareRelationsJob { (FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(SortableRelation.class)) - .repartition(numPartitions) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java index cea3539bc..ebd33363b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java @@ -3,14 +3,8 @@ package eu.dnetlib.dhp.oa.provision.utils; import static org.apache.commons.lang3.StringUtils.substringAfter; import com.google.common.collect.Sets; -import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.oaf.*; -import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; public class GraphMappingUtils { @@ -18,81 +12,6 @@ public class GraphMappingUtils { public static Set authorPidTypes = Sets.newHashSet("orcid", "magidentifier"); - public static RelatedEntity asRelatedEntity(E entity, Class clazz) { - - final RelatedEntity re = new RelatedEntity(); - re.setId(entity.getId()); - re.setType(EntityType.fromClass(clazz).name()); - - re.setPid(entity.getPid()); - re.setCollectedfrom(entity.getCollectedfrom()); - - switch (EntityType.fromClass(clazz)) { - case publication: - case dataset: - case otherresearchproduct: - case software: - Result result = (Result) entity; - - if (result.getTitle() == null && !result.getTitle().isEmpty()) { - re.setTitle(result.getTitle().stream().findFirst().get()); - } - - re.setDateofacceptance(getValue(result.getDateofacceptance())); - re.setPublisher(getValue(result.getPublisher())); - re.setResulttype(result.getResulttype()); - re.setInstances(result.getInstance()); - - // TODO still to be mapped - // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); - - break; - case datasource: - Datasource d = (Datasource) entity; - - re.setOfficialname(getValue(d.getOfficialname())); - re.setWebsiteurl(getValue(d.getWebsiteurl())); - re.setDatasourcetype(d.getDatasourcetype()); - re.setOpenairecompatibility(d.getOpenairecompatibility()); - - break; - case organization: - Organization o = (Organization) entity; - - re.setLegalname(getValue(o.getLegalname())); - re.setLegalshortname(getValue(o.getLegalshortname())); - re.setCountry(o.getCountry()); - re.setWebsiteurl(getValue(o.getWebsiteurl())); - break; - case project: - Project p = (Project) entity; - - re.setProjectTitle(getValue(p.getTitle())); - re.setCode(getValue(p.getCode())); - re.setAcronym(getValue(p.getAcronym())); - re.setContracttype(p.getContracttype()); - - List> f = p.getFundingtree(); - if (!f.isEmpty()) { - re.setFundingtree( - f.stream().map(s -> s.getValue()).collect(Collectors.toList())); - } - break; - } - return re; - } - - private static String getValue(Field field) { - return getFieldValueWithDefault(field, ""); - } - - private static T getFieldValueWithDefault(Field f, T defaultValue) { - return Optional.ofNullable(f) - .filter(Objects::nonNull) - .map(x -> x.getValue()) - .orElse(defaultValue); - } - public static String removePrefix(final String s) { if (s.contains("|")) return substringAfter(s, "|"); return s; diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index e6587ef5e..923f6de69 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -98,6 +98,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation