diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java index ff7aca627..7984f0104 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java @@ -68,12 +68,9 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { log.info("Copying openorgs to the working dir"); final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); - removeOutputDir(spark, outputPath); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - filterOpenorgs(spark, entityPath) .write() .mode(SaveMode.Overwrite) @@ -95,9 +92,13 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { .rdd(), Encoders.bean(Organization.class)); - entities.show(); + log.info("Number of organization entities processed: {}", entities.count()); - return entities.filter(entities.col("id").contains("openorgs____")); + entities = entities.filter(entities.col("id").contains("openorgs____")); + + log.info("Number of Openorgs organization entities: {}", entities.count()); + + return entities; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java index 4bb46222e..201043a08 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -74,8 +74,6 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { final String outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); - removeOutputDir(spark, outputPath); - final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); DedupConfig dedupConf = getConfigurations(isLookUpService, actionSetId).get(0); @@ -85,11 +83,13 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() - .filter(this::isOpenorgs) // takes only relations coming from openorgs - .filter(this::filterOpenorgsRels) // takes only isSimilarTo relations between organizations from openorgs - .filter(this::excludeOpenorgsMesh) // excludes relations between an organization and an openorgsmesh + .filter(this::isOpenorgs) + .filter(this::filterOpenorgsRels) + .filter(this::excludeOpenorgsMesh) .filter(this::excludeNonOpenorgs); // excludes relations with no openorgs id involved + log.info("Number of raw Openorgs Relations collected: {}", rawRels.count()); + // turn openorgs isSimilarTo relations into mergerels JavaRDD mergeRelsRDD = rawRels.flatMap(rel -> { List mergerels = new ArrayList<>(); @@ -103,6 +103,8 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { return mergerels.iterator(); }); + log.info("Number of Openorgs Merge Relations created: {}", mergeRelsRDD.count()); + spark .createDataset( mergeRelsRDD.rdd(), @@ -134,7 +136,7 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { if (rel.getCollectedfrom() != null) { for (KeyValue k : rel.getCollectedfrom()) { - if (k.getValue().equals("OpenOrgs Database")) { + if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { return true; } } @@ -144,7 +146,7 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { private boolean excludeOpenorgsMesh(Relation rel) { - if (rel.getSource().equals("openorgsmesh") || rel.getTarget().equals("openorgsmesh")) { + if (rel.getSource().contains("openorgsmesh") || rel.getTarget().contains("openorgsmesh")) { return false; } return true; @@ -152,7 +154,7 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { private boolean excludeNonOpenorgs(Relation rel) { - if (rel.getSource().equals("openorgs____") || rel.getTarget().equals("openorgs____")) { + if (rel.getSource().contains("openorgs____") || rel.getTarget().contains("openorgs____")) { return true; } return false; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index 64a110892..71bab79d0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -33,7 +33,7 @@ import scala.Tuple2; public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class); + private static final Logger log = LoggerFactory.getLogger(SparkCopyRelationsNoOpenorgs.class); public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -52,7 +52,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - new SparkUpdateEntity(parser, getSparkSession(conf)) + new SparkCopyRelationsNoOpenorgs(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -69,14 +69,14 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); - removeOutputDir(spark, outputPath); - JavaRDD simRels = spark .read() .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() - .filter(this::excludeOpenorgsRels); + .filter(x -> !isOpenorgs(x)); + + log.info("Number of non-Openorgs relations collected: {}", simRels.count()); spark .createDataset(simRels.rdd(), Encoders.bean(Relation.class)) @@ -96,15 +96,15 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { }; } - private boolean excludeOpenorgsRels(Relation rel) { + private boolean isOpenorgs(Relation rel) { if (rel.getCollectedfrom() != null) { for (KeyValue k : rel.getCollectedfrom()) { - if (k.getValue().equals("OpenOrgs Database")) { - return false; + if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { + return true; } } } - return true; + return false; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java deleted file mode 100644 index 4c0bfadf0..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java +++ /dev/null @@ -1,281 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup; - -import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.DNET_PROVENANCE_ACTIONS; -import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.PROVENANCE_ACTION_CLASS; -import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.hash; - -import java.io.IOException; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.graphx.Edge; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterables; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; -import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; -import eu.dnetlib.dhp.oa.dedup.model.Block; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.MapDocument; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; - -public class SparkRemoveDiffRels extends AbstractSparkAction { - - private static final Logger log = LoggerFactory.getLogger(SparkRemoveDiffRels.class); - - public SparkRemoveDiffRels(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } - - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); - parser.parseArgument(args); - - SparkConf conf = new SparkConf(); - new SparkCreateSimRels(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } - - @Override - public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException { - - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - - log.info("numPartitions: '{}'", numPartitions); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - - // for each dedup configuration - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - log.info("Removing diffrels for: '{}'", subEntity); - - final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - - final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - - final String openorgsMergeRelsPath = DedupUtility - .createOpenorgsMergeRelsPath(workingPath, actionSetId, subEntity); - - final int maxIterations = dedupConf.getWf().getMaxIterations(); - log.info("Max iterations {}", maxIterations); - - JavaRDD mergeRelsRDD = spark - .read() - .load(mergeRelsPath) - .as(Encoders.bean(Relation.class)) - .where("relClass == 'merges'") - .toJavaRDD(); - - System.out.println("mergeRelsRDD = " + mergeRelsRDD.count()); - -// JavaRDD, String>> diffRelsRDD = spark -// .read() -// .textFile(relationPath) -// .map(patchRelFn(), Encoders.bean(Relation.class)) -// .toJavaRDD() -// .filter(r -> filterRels(r, entity)) -// .map(rel -> { -// if (rel.getSource().compareTo(rel.getTarget()) < 0) -// return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); -// else -// return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); -// }); - // THIS IS FOR TESTING PURPOSE - JavaRDD, String>> diffRelsRDD = spark - .read() - .load(mergeRelsPath) - .as(Encoders.bean(Relation.class)) - .toJavaRDD() - .map(rel -> { - if (rel.getSource().compareTo(rel.getTarget()) < 0) - return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); - else - return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); - }) - .distinct(); - - System.out.println("diffRelsRDD = " + diffRelsRDD.count()); - -// JavaRDD, String>> flatMergeRels = mergeRelsRDD -// .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget())) -// .groupByKey() -// .flatMap(g -> { -// List, String>> rels = new ArrayList<>(); -// -// List ids = StreamSupport -// .stream(g._2().spliterator(), false) -// .collect(Collectors.toList()); -// -// for (int i = 0; i < ids.size(); i++) { -// for (int j = i + 1; j < ids.size(); j++) { -// if (ids.get(i).compareTo(ids.get(j)) < 0) -// rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1())); -// else -// rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1())); -// } -// } -// return rels.iterator(); -// -// }); - JavaRDD, String>> mergeRels = mergeRelsRDD - .map(rel -> { - if (rel.getSource().compareTo(rel.getTarget()) < 0) - return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "mergeRel"); - else - return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "mergeRel"); - }); - System.out.println("mergeRelsProcessed = " + mergeRels.count()); - -// JavaRDD purgedMergeRels = flatMergeRels -// .union(diffRelsRDD) -// .mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2()))) -// .reduceByKey((a, b) -> { -// List list = new ArrayList(); -// list.addAll(a); -// list.addAll(b); -// return list; -// }) -// .filter(rel -> rel._2().size() == 1) -// .mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1())) -// .flatMap(rel -> { -// List> rels = new ArrayList<>(); -// String source = rel._1(); -// rels.add(new Tuple2<>(source, rel._2()._1())); -// rels.add(new Tuple2<>(source, rel._2()._2())); -// return rels.iterator(); -// }) -// .distinct() -// .flatMap(rel -> tupleToMergeRel(rel, dedupConf)); - JavaRDD purgedMergeRels = mergeRels - .union(diffRelsRDD) - .mapToPair(t -> new Tuple2<>(t._1()._1() + "|||" + t._1()._2(), t._2())) - .groupByKey() - .filter(g -> Iterables.size(g._2()) == 1) - .flatMap( - t -> tupleToMergeRel( - new Tuple2<>(t._1().split("\\|\\|\\|")[0], t._1().split("\\|\\|\\|")[1]), - dedupConf)); - - System.out.println("purgedMergeRels = " + purgedMergeRels.count()); - - spark - .createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .json(openorgsMergeRelsPath); - } - } - - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - - private boolean filterRels(Relation rel, String entityType) { - - switch (entityType) { - case "result": - if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") - && rel.getSubRelType().equals("dedup")) - return true; - break; - case "organization": - if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") - && rel.getSubRelType().equals("dedup")) - return true; - break; - default: - return false; - } - return false; - } - - public Iterator tupleToMergeRel(Tuple2 rel, DedupConfig dedupConf) { - - List rels = new ArrayList<>(); - - rels.add(rel(rel._1(), rel._2(), "merges", dedupConf)); - rels.add(rel(rel._2(), rel._1(), "isMergedIn", dedupConf)); - - return rels.iterator(); - } - - private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { - - String entityType = dedupConf.getWf().getEntityType(); - - Relation r = new Relation(); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relClass); - r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); - r.setSubRelType("dedup"); - - DataInfo info = new DataInfo(); - info.setDeletedbyinference(false); - info.setInferred(true); - info.setInvisible(false); - info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); - Qualifier provenanceAction = new Qualifier(); - provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); - provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); - provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); - provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); - info.setProvenanceaction(provenanceAction); - - // TODO calculate the trust value based on the similarity score of the elements in the CC - // info.setTrust(); - - r.setDataInfo(info); - return r; - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index 339e99084..dc63d0a79 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -87,8 +87,8 @@ - - + + @@ -113,7 +113,7 @@ --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --workingPath${workingPath} --numPartitions8000 @@ -142,7 +142,7 @@ --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} --workingPath${workingPath} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --numPartitions8000 @@ -169,7 +169,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --cutConnectedComponent${cutConnectedComponent} @@ -196,7 +196,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --dbUrl${dbUrl} --dbTable${dbTable} --dbUser${dbUser} @@ -227,7 +227,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --apiUrl${apiUrl} --dbUrl${dbUrl} --dbTable${dbTable} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 998f3ac21..c28a2a921 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -92,11 +92,23 @@ + + + + + + + + + + + + yarn @@ -182,7 +194,7 @@ yarn cluster - Copy Merge Relations + Copy Openorgs Merge Relations eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels dhp-dedup-openaire-${projectVersion}.jar @@ -201,7 +213,7 @@ --actionSetId${actionSetIdOpenorgs} --numPartitions8000 - + @@ -210,7 +222,7 @@ yarn cluster - Copy Entities + Copy Openorgs Entities eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgs dhp-dedup-openaire-${projectVersion}.jar @@ -225,7 +237,6 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetIdOpenorgs} @@ -262,7 +273,7 @@ yarn cluster - Update Entity + Copy Non-Openorgs Relations eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs dhp-dedup-openaire-${projectVersion}.jar diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java index f8627d023..6ad2145a9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java @@ -19,9 +19,11 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; @@ -61,7 +63,7 @@ public class SparkOpenorgsTest implements Serializable { public static void cleanUp() throws IOException, URISyntaxException { testGraphBasePath = Paths - .get(SparkOpenorgsTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) + .get(SparkOpenorgsTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs").toURI()) .toFile() .getAbsolutePath(); testOutputBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-") @@ -71,9 +73,8 @@ public class SparkOpenorgsTest implements Serializable { .toAbsolutePath() .toString(); -// FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - FileUtils.deleteDirectory(new File("/tmp/test-orchestrator/organization_openorgs_mergerels")); final SparkConf conf = new SparkConf(); conf.set("spark.sql.shuffle.partitions", "200"); @@ -133,7 +134,7 @@ public class SparkOpenorgsTest implements Serializable { .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") .count(); - assertEquals(0, orgs_deduprecord); + assertEquals(100, orgs_deduprecord); } @Test @@ -161,7 +162,7 @@ public class SparkOpenorgsTest implements Serializable { .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .count(); - assertEquals(0, orgs_mergerel); + assertEquals(6, orgs_mergerel); } @@ -190,67 +191,7 @@ public class SparkOpenorgsTest implements Serializable { .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") .count(); - System.out.println("orgs_simrel = " + orgs_simrel); - } - - @Test - public void createSimRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); - parser - .parseArgument( - new String[] { - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", "/tmp", - "-np", "50" - }); - - new SparkCreateSimRels(parser, spark).run(isLookUpService); - - long orgs_simrel = spark - .read() - .textFile("/tmp/" + testActionSetId + "/organization_simrel") - .count(); - - assertEquals(3082, orgs_simrel); - } - - @Test - public void createMergeRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); - parser - .parseArgument( - new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - "/tmp" - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); - - long orgs_mergerel = spark - .read() - .load("/tmp/" + testActionSetId + "/organization_mergerel") - .count(); - assertEquals(1272, orgs_mergerel); + assertEquals(96, orgs_simrel); } @Test @@ -273,9 +214,7 @@ public class SparkOpenorgsTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); -// Dataset relsRDD = spark.read().textFile(testDedupGraphBasePath + "/relation").map(patchRelFn(), Encoders.bean(Relation.class)); - - assertEquals(500, relations); + assertEquals(400, relations); } @AfterAll diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/organization/organization.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/organization/organization.gz new file mode 100644 index 000000000..45b0edeb2 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/organization/organization.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/relation/relation.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/relation/relation.gz new file mode 100644 index 000000000..f0c7f4211 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/relation/relation.gz differ