join methods

This commit is contained in:
Michele Artini 2020-06-19 15:24:30 +02:00
parent 4822747313
commit d88fe0ac84
2 changed files with 75 additions and 55 deletions

View File

@ -28,12 +28,17 @@ import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
@ -84,11 +89,8 @@ public class GenerateEventsApplication {
removeOutputDir(spark, eventsPath);
// TODO REMOVE THIS
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache();
relatedEntities(projects, rels, RelatedProject.class)
relatedProjects(spark, graphPath)
.write()
.mode(SaveMode.Overwrite)
.json(eventsPath);
@ -144,7 +146,6 @@ public class GenerateEventsApplication {
final String graphPath,
final Class<SRC> sourceClass) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
// final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
// spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
@ -160,25 +161,78 @@ public class GenerateEventsApplication {
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
// TODO UNCOMMENT THIS
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
// final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels,
// RelatedSoftware.class));
// final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels,
// RelatedDataset.class));
// final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels,
// RelatedPublication.class));;
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedProjects(spark, graphPath));
// final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedDataset(spark, graphPath));
// final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedPublications(spark, graphPath));
// final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedSoftwares(spark, graphPath));
return r0; // TODO it should be r4
return r1; // TODO it should be r4
}
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets,
final Dataset<Relation> rels,
final Class<RT> clazz) {
private static Dataset<RelatedProject> relatedProjects(final SparkSession spark, final String graphPath) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT));
return rels
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
.joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner")
.map(
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
Encoders.bean(clazz));
t -> new RelatedProject(
t._1.getSource(),
t._1.getRelType(),
ConversionUtils.oafProjectToBrokerProject(t._2)),
Encoders.bean(RelatedProject.class));
}
private static Dataset<RelatedDataset> relatedDataset(final SparkSession spark, final String graphPath) {
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class);
return rels
.joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner")
.map(
t -> new RelatedDataset(
t._1.getSource(),
t._1.getRelType(),
ConversionUtils.oafDatasetToBrokerDataset(t._2)),
Encoders.bean(RelatedDataset.class));
}
private static Dataset<RelatedSoftware> relatedSoftwares(final SparkSession spark, final String graphPath) {
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class);
return rels
.joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner")
.map(
t -> new RelatedSoftware(
t._1.getSource(),
t._1.getRelType(),
ConversionUtils.oafSoftwareToBrokerSoftware(t._2)),
Encoders.bean(RelatedSoftware.class));
}
private static Dataset<RelatedPublication> relatedPublications(final SparkSession spark, final String graphPath) {
final Dataset<Publication> pubs = readPath(spark, graphPath + "/publication", Publication.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class);
return rels
.joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner")
.map(
t -> new RelatedPublication(
t._1.getSource(),
t._1.getRelType(),
ConversionUtils.oafPublicationToBrokerPublication(t._2)),
Encoders.bean(RelatedPublication.class));
}
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,

View File

@ -1,34 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
public class RelatedEntityFactory {
@SuppressWarnings("unchecked")
public static <RT, T> RT newRelatedEntity(final String sourceId,
final String relType,
final T target,
final Class<RT> clazz) {
if (clazz == RelatedProject.class) {
return (RT) new RelatedProject(sourceId, relType,
ConversionUtils.oafProjectToBrokerProject((Project) target));
} else if (clazz == RelatedSoftware.class) {
return (RT) new RelatedSoftware(sourceId, relType,
ConversionUtils.oafSoftwareToBrokerSoftware((Software) target));
} else if (clazz == RelatedDataset.class) {
return (RT) new RelatedDataset(sourceId, relType,
ConversionUtils.oafDatasetToBrokerDataset((Dataset) target));
} else if (clazz == RelatedPublication.class) {
return (RT) new RelatedPublication(sourceId, relType,
ConversionUtils.oafPublicationToBrokerPublication((Publication) target));
} else {
return null;
}
}
}