fixed a problem with join

This commit is contained in:
Michele Artini 2020-12-15 08:30:26 +01:00
parent 26a941315a
commit e3e0ab1de1
8 changed files with 32 additions and 10 deletions

View File

@ -63,7 +63,7 @@ public class PrepareGroupsJob {
.readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class);
final Dataset<Relation> mergedRels = ClusterUtils final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .loadRelations(graphPath, spark)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()

View File

@ -62,7 +62,7 @@ public class PrepareRelatedDatasetsJob {
.map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class)); .map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class));
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
@ -72,7 +72,8 @@ public class PrepareRelatedDatasetsJob {
final Dataset<RelatedDataset> dataset = rels final Dataset<RelatedDataset> dataset = rels
.joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> { .map(t -> {
final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); final RelatedDataset rel = new RelatedDataset(t._1.getSource(),
t._2);
rel.getRelDataset().setRelType(t._1.getRelClass()); rel.getRelDataset().setRelType(t._1.getRelClass());
return rel; return rel;
}, Encoders.bean(RelatedDataset.class)); }, Encoders.bean(RelatedDataset.class));

View File

@ -64,7 +64,7 @@ public class PrepareRelatedProjectsJob {
.map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class)); .map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class));
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))

View File

@ -65,7 +65,7 @@ public class PrepareRelatedPublicationsJob {
Encoders.bean(OaBrokerRelatedPublication.class)); Encoders.bean(OaBrokerRelatedPublication.class));
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
@ -75,7 +75,8 @@ public class PrepareRelatedPublicationsJob {
final Dataset<RelatedPublication> dataset = rels final Dataset<RelatedPublication> dataset = rels
.joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> { .map(t -> {
final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); final RelatedPublication rel = new RelatedPublication(
t._1.getSource(), t._2);
rel.getRelPublication().setRelType(t._1.getRelClass()); rel.getRelPublication().setRelType(t._1.getRelClass());
return rel; return rel;
}, Encoders.bean(RelatedPublication.class)); }, Encoders.bean(RelatedPublication.class));

View File

@ -64,7 +64,7 @@ public class PrepareRelatedSoftwaresJob {
.map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class)); .map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class));
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))

View File

@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class ClusterUtils { public class ClusterUtils {
@ -30,6 +31,16 @@ public class ClusterUtils {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
public static Dataset<Relation> loadRelations(final String graphPath, final SparkSession spark) {
return ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.map(r -> {
r.setSource(ConversionUtils.cleanOpenaireId(r.getSource()));
r.setTarget(ConversionUtils.cleanOpenaireId(r.getTarget()));
return r;
}, Encoders.bean(Relation.class));
}
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(
final SparkSession spark, final SparkSession spark,
final String inputPath, final String inputPath,

View File

@ -129,7 +129,7 @@ public class ConversionUtils {
return res; return res;
} }
private static String cleanOpenaireId(final String id) { public static String cleanOpenaireId(final String id) {
return id.contains("|") ? StringUtils.substringAfter(id, "|") : id; return id.contains("|") ? StringUtils.substringAfter(id, "|") : id;
} }

View File

@ -59,9 +59,18 @@ public class DatasourceRelationsAccumulator implements Serializable {
final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator(); final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator();
collectedFromSet collectedFromSet
.stream() .stream()
.map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL)) .map(
s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s),
BrokerConstants.COLLECTED_FROM_REL))
.forEach(res::addTuple); .forEach(res::addTuple);
hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple);
hostedBySet
.stream()
.map(
s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s),
BrokerConstants.HOSTED_BY_REL))
.forEach(res::addTuple);
return res; return res;
} }