diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java index 732ba2c2d..d853f3858 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java @@ -30,25 +30,17 @@ import scala.Tuple2; public class PrepareInfo implements Serializable { - // leggo le relazioni e seleziono quelle fra result ed organizzazioni - // raggruppo per result e salvo - // r => {o1, o2, o3} - - // leggo le relazioni fra le organizzazioni e creo la gerarchia delle parentele: - // hashMap key organizzazione -> value tutti i suoi padri - // o => {p1, p2} - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class); // associate orgs with all their parent - private static final String relOrgQuery = "SELECT target key, collect_set(source) as valueSet " + + private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " + "FROM relation " + "WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() + "' and datainfo.deletedbyinference = false " + "GROUP BY target"; - private static final String relResQuery = "SELECT source key, collect_set(target) as valueSet " + + //associates results with all the orgs they are affiliated to + private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " + "FROM relation " + "WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + "' and datainfo.deletedbyinference = false " + @@ -101,7 +93,7 @@ public class PrepareInfo implements Serializable { relation.createOrReplaceTempView("relation"); spark - .sql(relOrgQuery) + .sql(ORGANIZATION_ORGANIZATION_QUERY) .as(Encoders.bean(KeyValueSet.class)) .write() .mode(SaveMode.Overwrite) @@ -109,7 +101,7 @@ public class PrepareInfo implements Serializable { .json(childParentOrganizationPath); spark - .sql(relResQuery) + .sql(RESULT_ORGANIZATION_QUERY) .as(Encoders.bean(KeyValueSet.class)) .write() .mode(SaveMode.Overwrite) @@ -130,7 +122,7 @@ public class PrepareInfo implements Serializable { "' and datainfo.deletedbyinference = false") .as(Encoders.STRING()); - // prendo dalla join i risultati che hanno solo il lato sinistro: sono foglie + // takes from the join the entities having only the left hand side: the leaves. Saves them children .joinWith(parent, children.col("child").equalTo(parent.col("parent")), "left") .map((MapFunction, String>) value -> { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index a900fe0d8..f5d8361d7 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -32,6 +32,7 @@ import eu.dnetlib.dhp.schema.oaf.Relation; public class SparkResultToOrganizationFromSemRel implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class); private static final int MAX_ITERATION = 5; + public static final String NEW_RELATION_PATH = "/newRelation"; public static void main(String[] args) throws Exception { @@ -120,11 +121,11 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { iteration++; StepActions .execStep( - spark, graphPath, workingPath + "/newRelation", + spark, graphPath, workingPath + NEW_RELATION_PATH, leavesPath, childParentPath, resultOrganizationPath); StepActions .prepareForNextStep( - spark, workingPath + "/newRelation", resultOrganizationPath, leavesPath, + spark, workingPath + NEW_RELATION_PATH, resultOrganizationPath, leavesPath, childParentPath, workingPath + "/leaves", workingPath + "/resOrg"); moveOutput(spark, workingPath, leavesPath, resultOrganizationPath); leavesCount = readPath(spark, leavesPath, Leaves.class).count(); @@ -154,7 +155,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { propagationCounter.getNotReachedFirstParent().add(1); } - addNewRelations(spark, workingPath + "/newRelation", outputPath); + addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath); } private static void moveOutput(SparkSession spark, String workingPath, String leavesPath, diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java index affe1f56f..d190e5342 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java @@ -27,7 +27,6 @@ import scala.Tuple2; public class StepActions implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Logger log = LoggerFactory.getLogger(StepActions.class); public static void execStep(SparkSession spark, String graphPath, String newRelationPath, @@ -185,10 +184,9 @@ public class StepActions implements Serializable { "GROUP BY resId") .as(Encoders.bean(KeyValueSet.class)); - // resultParent.foreach((ForeachFunction)kv -> - // System.out.println(OBJECT_MAPPER.writeValueAsString(kv))); + // create new relations from result to organization for each result linked to a leaf - Dataset tmp = resultParent + return resultParent .flatMap( (FlatMapFunction) v -> v .getValueSet() @@ -206,8 +204,8 @@ public class StepActions implements Serializable { .collect(Collectors.toList()) .iterator(), Encoders.bean(Relation.class)); - tmp.foreach((ForeachFunction) r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); - return tmp; + + } }