enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
8 changed files with 47 additions and 72 deletions
Showing only changes of commit c3286f4c37 - Show all commits

View File

@ -15,9 +15,11 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareRelatedDatasetsJob { public class PrepareRelatedDatasetsJob {
@ -60,17 +62,18 @@ public class PrepareRelatedDatasetsJob {
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels rels
.joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner")
.map( .map(t -> {
t -> new RelatedDataset( final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2);
t._1.getSource(), rel.getRelDataset().setRelType(t._1.getRelClass());
t._1.getRelType(), return rel;
t._2), }, Encoders.bean(RelatedDataset.class))
Encoders.bean(RelatedDataset.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(relsPath); .json(relsPath);

View File

@ -15,7 +15,9 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
@ -58,22 +60,21 @@ public class PrepareRelatedProjectsJob {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final Dataset<Project> projects = ClusterUtils.readPath(spark, graphPath + "/project", Project.class); final Dataset<OaBrokerProject> projects = ClusterUtils
.readPath(spark, graphPath + "/project", Project.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
.map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class));
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels rels
.joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner") .joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner")
.map( .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class))
t -> new RelatedProject(
t._1.getSource(),
t._1.getRelType(),
ConversionUtils.oafProjectToBrokerProject(t._2)),
Encoders.bean(RelatedProject.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(relsPath); .json(relsPath);

View File

@ -17,9 +17,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
@ -32,8 +34,9 @@ public class PrepareRelatedPublicationsJob {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(PrepareRelatedPublicationsJob.class .toString(
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); PrepareRelatedPublicationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -60,19 +63,24 @@ public class PrepareRelatedPublicationsJob {
final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils
.readPath(spark, graphPath + "/publication", Publication.class) .readPath(spark, graphPath + "/publication", Publication.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId())) .filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
.map(ConversionUtils::oafPublicationToBrokerPublication, Encoders.bean(OaBrokerRelatedPublication.class)); .map(
ConversionUtils::oafPublicationToBrokerPublication,
Encoders.bean(OaBrokerRelatedPublication.class));
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels rels
.joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedPublication( .map(t -> {
t._1.getSource(), final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2);
t._1.getRelType(), rel.getRelPublication().setRelType(t._1.getRelClass());
t._2), Encoders.bean(RelatedPublication.class)) return rel;
}, Encoders.bean(RelatedPublication.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(relsPath); .json(relsPath);

View File

@ -17,9 +17,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.Software;
@ -32,8 +34,9 @@ public class PrepareRelatedSoftwaresJob {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(PrepareRelatedSoftwaresJob.class .toString(
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); PrepareRelatedSoftwaresJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -64,15 +67,14 @@ public class PrepareRelatedSoftwaresJob {
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels rels
.joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedSoftware( .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class))
t._1.getSource(),
t._1.getRelType(),
t._2), Encoders.bean(RelatedSoftware.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(relsPath); .json(relsPath);

View File

@ -11,16 +11,15 @@ public class RelatedDataset implements Serializable {
* *
*/ */
private static final long serialVersionUID = 774487705184038324L; private static final long serialVersionUID = 774487705184038324L;
private String source; private String source;
private String relType;
private OaBrokerRelatedDataset relDataset; private OaBrokerRelatedDataset relDataset;
public RelatedDataset() { public RelatedDataset() {
} }
public RelatedDataset(final String source, final String relType, final OaBrokerRelatedDataset relDataset) { public RelatedDataset(final String source, final OaBrokerRelatedDataset relDataset) {
this.source = source; this.source = source;
this.relType = relType;
this.relDataset = relDataset; this.relDataset = relDataset;
} }
@ -32,14 +31,6 @@ public class RelatedDataset implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public OaBrokerRelatedDataset getRelDataset() { public OaBrokerRelatedDataset getRelDataset() {
return relDataset; return relDataset;
} }

View File

@ -13,15 +13,13 @@ public class RelatedProject implements Serializable {
private static final long serialVersionUID = 4941437626549329870L; private static final long serialVersionUID = 4941437626549329870L;
private String source; private String source;
private String relType;
private OaBrokerProject relProject; private OaBrokerProject relProject;
public RelatedProject() { public RelatedProject() {
} }
public RelatedProject(final String source, final String relType, final OaBrokerProject relProject) { public RelatedProject(final String source, final OaBrokerProject relProject) {
this.source = source; this.source = source;
this.relType = relType;
this.relProject = relProject; this.relProject = relProject;
} }
@ -33,14 +31,6 @@ public class RelatedProject implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public OaBrokerProject getRelProject() { public OaBrokerProject getRelProject() {
return relProject; return relProject;
} }

View File

@ -13,16 +13,13 @@ public class RelatedPublication implements Serializable {
private static final long serialVersionUID = 9021609640411395128L; private static final long serialVersionUID = 9021609640411395128L;
private String source; private String source;
private String relType;
private OaBrokerRelatedPublication relPublication; private OaBrokerRelatedPublication relPublication;
public RelatedPublication() { public RelatedPublication() {
} }
public RelatedPublication(final String source, final String relType, public RelatedPublication(final String source, final OaBrokerRelatedPublication relPublication) {
final OaBrokerRelatedPublication relPublication) {
this.source = source; this.source = source;
this.relType = relType;
this.relPublication = relPublication; this.relPublication = relPublication;
} }
@ -34,14 +31,6 @@ public class RelatedPublication implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public OaBrokerRelatedPublication getRelPublication() { public OaBrokerRelatedPublication getRelPublication() {
return relPublication; return relPublication;
} }

View File

@ -11,16 +11,15 @@ public class RelatedSoftware implements Serializable {
* *
*/ */
private static final long serialVersionUID = 7573383356943300157L; private static final long serialVersionUID = 7573383356943300157L;
private String source; private String source;
private String relType;
private OaBrokerRelatedSoftware relSoftware; private OaBrokerRelatedSoftware relSoftware;
public RelatedSoftware() { public RelatedSoftware() {
} }
public RelatedSoftware(final String source, final String relType, final OaBrokerRelatedSoftware relSoftware) { public RelatedSoftware(final String source, final OaBrokerRelatedSoftware relSoftware) {
this.source = source; this.source = source;
this.relType = relType;
this.relSoftware = relSoftware; this.relSoftware = relSoftware;
} }
@ -32,14 +31,6 @@ public class RelatedSoftware implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public OaBrokerRelatedSoftware getRelSoftware() { public OaBrokerRelatedSoftware getRelSoftware() {
return relSoftware; return relSoftware;
} }