bug fix: changed join in propagaterelations without applying filter on the id

This commit is contained in:
miconis 2021-04-16 16:40:42 +02:00
parent 67085da305
commit 7ad573d023
2 changed files with 42 additions and 32 deletions

View File

@ -22,6 +22,7 @@ import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2; import scala.Tuple2;
import scala.Tuple3;
public class SparkPropagateRelation extends AbstractSparkAction { public class SparkPropagateRelation extends AbstractSparkAction {
@ -85,13 +86,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
Dataset<Relation> rels = spark.read().textFile(relationPath).map(patchRelFn(), Encoders.bean(Relation.class)); Dataset<Relation> rels = spark.read().textFile(relationPath).map(patchRelFn(), Encoders.bean(Relation.class));
Dataset<Relation> newRels = processDataset( Dataset<Relation> newRels = createNewRels(rels, mergedIds, getFixRelFn());
processDataset(rels, mergedIds, FieldType.SOURCE, getFixRelFn(FieldType.SOURCE)),
mergedIds,
FieldType.TARGET,
getFixRelFn(FieldType.TARGET))
.filter(SparkPropagateRelation::containsDedup)
.distinct();
Dataset<Relation> updated = processDataset( Dataset<Relation> updated = processDataset(
processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()), processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()),
@ -120,6 +115,26 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }
private static Dataset<Relation> createNewRels(
Dataset<Relation> rels,
Dataset<Tuple2<String,String>> mergedIds,
MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
Dataset<Tuple3<String, Relation, String>> mapped = rels
.map(
(MapFunction<Relation, Tuple3<String, Relation, String>>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), r, getId(r, FieldType.TARGET)),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING()));
Dataset<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>> relSource = mapped
.joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer");
return relSource
.joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_2")), "left_outer")
.filter((FilterFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>>) r -> r._2() != null || r._1() != null)
.map(mapRel, Encoders.bean(Relation.class))
.distinct();
}
private static Dataset<Relation> processDataset( private static Dataset<Relation> processDataset(
Dataset<Relation> rels, Dataset<Relation> rels,
Dataset<Tuple2<String, String>> mergedIds, Dataset<Tuple2<String, String>> mergedIds,
@ -153,28 +168,25 @@ public class SparkPropagateRelation extends AbstractSparkAction {
} }
} }
private static MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, String>>, Relation> getFixRelFn( private static MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> getFixRelFn() {
FieldType type) {
return value -> { return value -> {
if (value._2() != null) {
Relation r = value._1()._2(); Relation r = value._1()._1()._2();
String id = value._2()._2(); String newSource = value._1()._2() != null ? value._1()._2()._2() : null;
if (r.getDataInfo() == null) { String newTarget = value._2() != null ? value._2()._2() : null;
r.setDataInfo(new DataInfo());
} if (r.getDataInfo() == null) {
r.getDataInfo().setDeletedbyinference(false); r.setDataInfo(new DataInfo());
switch (type) {
case SOURCE:
r.setSource(id);
return r;
case TARGET:
r.setTarget(id);
return r;
default:
throw new IllegalArgumentException("");
}
} }
return value._1()._2(); r.getDataInfo().setDeletedbyinference(false);
if (newSource != null)
r.setSource(newSource);
if (newTarget != null)
r.setTarget(newTarget);
return r;
}; };
} }
@ -192,8 +204,4 @@ public class SparkPropagateRelation extends AbstractSparkAction {
}; };
} }
private static boolean containsDedup(final Relation r) {
return r.getSource().toLowerCase().contains(ModelConstants.DEDUP)
|| r.getTarget().toLowerCase().contains(ModelConstants.DEDUP);
}
} }

View File

@ -12,6 +12,7 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -66,6 +67,7 @@ public class SparkDedupTest implements Serializable {
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
@ -534,7 +536,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4862, relations); assertEquals(4718, relations);
// check deletedbyinference // check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark