diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java index dd74677bc9..eaa00c4b8a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -78,42 +78,16 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - DedupConfig dedupConf = getConfigurations(isLookUpService, actionSetId).get(0); - - JavaRDD rawRels = spark + //collect organization merge relations from openorgs database + JavaRDD mergeRelsRDD = spark .read() .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() - .filter(this::isOpenorgs) - .filter(this::filterOpenorgsRels); + .filter(this::isOpenorgs) //take only openorgs relations + .filter(this::isMergeRel); //take merges and isMergedIn relations - JavaRDD selfRawRels = rawRels - .map(r -> r.getSource()) - .distinct() - .map(s -> rel(s, s, ModelConstants.IS_SIMILAR_TO, dedupConf)); - - log.info("Number of raw Openorgs Relations collected: {}", rawRels.count()); - - // turn openorgs isSimilarTo relations into mergerels - JavaRDD mergeRelsRDD = rawRels - .union(selfRawRels) - .map(r -> { - r.setSource(createDedupID(r.getSource())); // create the dedup_id to align it to the openaire dedup - // format - return r; - }) - .flatMap(rel -> { - - List mergerels = new ArrayList<>(); - - mergerels.add(rel(rel.getSource(), rel.getTarget(), ModelConstants.MERGES, dedupConf)); - mergerels.add(rel(rel.getTarget(), rel.getSource(), ModelConstants.IS_MERGED_IN, dedupConf)); - - return mergerels.iterator(); - }); - - log.info("Number of Openorgs Merge Relations created: {}", mergeRelsRDD.count()); + log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count()); spark .createDataset( @@ -124,45 +98,9 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { .parquet(outputPath); } - private boolean filterOpenorgsRels(Relation rel) { - return rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO) + private boolean isMergeRel(Relation rel) { + return (rel.getRelClass().equals(ModelConstants.MERGES) || rel.getRelClass().equals(ModelConstants.IS_MERGED_IN)) && rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE) && rel.getSubRelType().equals(ModelConstants.DEDUP); } - - private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { - - String entityType = dedupConf.getWf().getEntityType(); - - Relation r = new Relation(); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relClass); - r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); - r.setSubRelType(ModelConstants.DEDUP); - - DataInfo info = new DataInfo(); - info.setDeletedbyinference(false); - info.setInferred(true); - info.setInvisible(false); - info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); - Qualifier provenanceAction = new Qualifier(); - provenanceAction.setClassid(ModelConstants.PROVENANCE_DEDUP); - provenanceAction.setClassname(ModelConstants.PROVENANCE_DEDUP); - provenanceAction.setSchemeid(ModelConstants.DNET_PROVENANCE_ACTIONS); - provenanceAction.setSchemename(ModelConstants.DNET_PROVENANCE_ACTIONS); - info.setProvenanceaction(provenanceAction); - - // TODO calculate the trust value based on the similarity score of the elements in the CC - // info.setTrust(); - - r.setDataInfo(info); - return r; - } - - public String createDedupID(String id) { - - String prefix = id.split("\\|")[0]; - return prefix + "|dedup_wf_001::" + DHPUtils.md5(id); - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java similarity index 58% rename from dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java rename to dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java index 543558f367..38f9f03e88 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java @@ -4,8 +4,10 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; import java.util.Optional; +import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -24,11 +26,12 @@ import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; -public class SparkCopyOpenorgs extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgs.class); +public class SparkCreateOrgsDedupRecord extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCreateOrgsDedupRecord.class); - public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + public SparkCreateOrgsDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -36,13 +39,13 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkCopyOpenorgs.class + SparkCreateOrgsDedupRecord.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); - new SparkCopyOpenorgs(parser, getSparkSession(conf)) + new SparkCreateOrgsDedupRecord(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -64,14 +67,15 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - String subEntity = "organization"; - log.info("Copying openorgs to the working dir"); + log.info("Copying organization dedup records to the working dir"); - final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); + final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, "organization"); - final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); - filterOpenorgs(spark, entityPath) + final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + + rootOrganization(spark, entityPath, mergeRelsPath) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -79,26 +83,43 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { } - public static Dataset filterOpenorgs( + public static Dataset rootOrganization( final SparkSession spark, - final String entitiesInputPath) { + final String entitiesInputPath, + final String mergeRelsPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - Dataset entities = spark - .createDataset( - sc - .textFile(entitiesInputPath) - .map(it -> OBJECT_MAPPER.readValue(it, Organization.class)) - .rdd(), - Encoders.bean(Organization.class)); + + JavaPairRDD entities = sc.textFile(entitiesInputPath) + .map(it -> OBJECT_MAPPER.readValue(it, Organization.class)) + .mapToPair(o -> new Tuple2<>(o.getId(), o)); log.info("Number of organization entities processed: {}", entities.count()); - entities = entities.filter(entities.col("id").contains(DedupUtility.OPENORGS_ID_PREFIX)); + //collect root ids (ids in the source of 'merges' relations + JavaPairRDD roots = spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .map( + (MapFunction>) r -> new Tuple2<>(r.getSource(), "root"), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .toJavaRDD() + .mapToPair(t -> t) + .distinct(); - log.info("Number of Openorgs organization entities: {}", entities.count()); + Dataset rootOrgs = spark.createDataset( + entities + .leftOuterJoin(roots) + .filter(e -> e._2()._2().isPresent()) //if it has been joined with 'root' then it's a root record + .map(e -> e._2()._1()) + .rdd(), + Encoders.bean(Organization.class)); - return entities; + log.info("Number of Root organization: {}", entities.count()); + + return rootOrgs; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 5ebc00d5aa..1e13d01114 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -101,6 +101,9 @@ public class SparkUpdateEntity extends AbstractSparkAction { .mapToPair( (PairFunction) s -> new Tuple2<>( MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); + if (type == EntityType.organization) //exclude root records from organizations + entitiesWithId = excludeRootOrgs(entitiesWithId, rel); + JavaRDD map = entitiesWithId .leftOuterJoin(mergedIds) .map(k -> { @@ -110,13 +113,6 @@ public class SparkUpdateEntity extends AbstractSparkAction { return k._2()._1(); }); - if (type == EntityType.organization) // exclude openorgs with deletedbyinference=true - map = map.filter(it -> { - Organization org = OBJECT_MAPPER.readValue(it, Organization.class); - return !org.getId().contains("openorgs____") || (org.getId().contains("openorgs____") - && !org.getDataInfo().getDeletedbyinference()); - }); - sourceEntity = map.union(sc.textFile(dedupRecordPath)); } @@ -159,4 +155,20 @@ public class SparkUpdateEntity extends AbstractSparkAction { throw new RuntimeException("Unable to convert json", e); } } + + private static JavaPairRDD excludeRootOrgs(JavaPairRDD entitiesWithId, Dataset rel) { + + JavaPairRDD roots = rel + .where("relClass == 'merges'") + .select(rel.col("source")) + .distinct() + .toJavaRDD() + .mapToPair( + (PairFunction) r -> new Tuple2<>(r.getString(0), "root")); + + return entitiesWithId + .leftOuterJoin(roots) + .filter(e -> !e._2()._2().isPresent()) + .mapToPair(e -> new Tuple2<>(e._1(), e._2()._1())); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 4b39cb56a7..b86bc009cd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -187,7 +187,7 @@ - + yarn @@ -220,7 +220,7 @@ yarn cluster Create Organizations Dedup Records - eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord + eu.dnetlib.dhp.oa.dedup.SparkCreateOrgsDedupRecord dhp-dedup-openaire-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -241,33 +241,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - yarn diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java index 419be1da31..ac2b296581 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java @@ -12,10 +12,14 @@ import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; @@ -148,9 +152,14 @@ public class SparkOpenorgsTest implements Serializable { long orgs_mergerel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .load(DedupUtility.createMergeRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); + Dataset orgrels = spark.read().load(DedupUtility.createMergeRelPath(testOutputBasePath, testActionSetId, "organization")).as(Encoders.bean(Relation.class)); + + for (Relation r: orgrels.toJavaRDD().collect()) + System.out.println("r = " + r.getSource() + "---" + r.getTarget() + "---" + r.getRelClass()); + assertEquals(384, orgs_mergerel); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 19dcde3bda..270c90913d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -180,14 +180,14 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i log.info("Processing Openorgs Merge Rels..."); smdbe.execute("queryOpenOrgsSimilarityForProvision.sql", smdbe::processOrgOrgSimRels); - + //TODO cambiare il mapping delle relazioni in modo che crei merges e isMergedIn + // TODO (specifico per questo caso, questa funzione di mapping verrà usata così com'è nel caso di openorgs dedup break; case openaire_organizations: log.info("Processing Organizations..."); smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); - break; } log.info("All done."); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql index 42ba0cf916..73e39f8fa4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql @@ -50,4 +50,7 @@ GROUP BY o.trust, d.id, d.officialname, - o.country; \ No newline at end of file + o.country; + +-- TODO modificare in modo da fare il merge dei pid di tutti i record mergiati (per gli openorgs, approvati) +-- TODO invece per tutti gli altri con dei duplicati non fare questa cosa \ No newline at end of file