diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 8b802cf803..cd13ab9e57 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -26,7 +26,7 @@ import static eu.dnetlib.dhp.PropagationConstant.*; public class SparkResultToOrganizationFromIstRepoJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_resulttoorganizationfrominstrepo_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); @@ -101,97 +101,7 @@ public class SparkResultToOrganizationFromIstRepoJob { final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); - - /* - //get the institutional repositories - JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class)) - .filter(ds -> INSTITUTIONAL_REPO_TYPE.equals(ds.getDatasourcetype().getClassid())) - .map(ds -> new TypedRow().setSourceId(ds.getId())) - .mapToPair(toPair()); - - - JavaPairRDD rel_datasource_organization = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelType()) && RELATION_DATASOURCE_ORGANIZATION_REL_CLASS.equals(r.getRelClass())) - .map(r -> { - TypedRow tp = new TypedRow(); tp.setSourceId(r.getSource()); tp.setTargetId(r.getTarget()); - return tp; - }) - .mapToPair(toPair()); - - JavaPairRDD instdatasource_organization = datasources.join(rel_datasource_organization) - .map(x -> x._2()._2()) - .mapToPair(toPair()); - - JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)); - JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); - JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); - JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)); - JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); - - - JavaPairRDD datasource_results = publications - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - }) - .union(datasets - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) - .union(software - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) - .union(other - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })); - - JavaRDD newRels = instdatasource_organization.join(datasource_results) - .flatMap(c -> { - List rels = new ArrayList(); - String orgId = c._2()._1().getTargetId(); - String resId = c._2()._2().getTargetId(); - rels.add(getRelation(orgId, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - rels.add(getRelation(resId, orgId, RELATION_RESULT_ORGANIZATION_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - return rels.iterator(); - }); - newRels.map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/relation_new"); - - newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/relation");*/ + } private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { @@ -203,8 +113,8 @@ public class SparkResultToOrganizationFromIstRepoJob { } private static void createUpdateForRelationWrite(JavaRDD toupdaterelation, String outputPath, String update_type) { - toupdaterelation.map(s -> { - List relationList = Arrays.asList(); + toupdaterelation.flatMap(s -> { + List relationList = new ArrayList<>(); List orgs = s.getList(1); String resId = s.getString(0); for (String org : orgs) { @@ -220,22 +130,6 @@ public class SparkResultToOrganizationFromIstRepoJob { }).map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(outputPath + "/" + update_type); } - - private static org.apache.spark.sql.Dataset instPropagationAssoc(SparkSession spark, String cfhbTable){ - String query = "SELECT id, collect_set(org) org "+ - "FROM ( SELECT id, org " + - "FROM rels " + - "JOIN " + cfhbTable + - " ON cf = ds " + - "UNION ALL " + - "SELECT id , org " + - "FROM rels " + - "JOIN " + cfhbTable + - " ON hb = ds ) tmp " + - "GROUP BY id"; - return spark.sql(query); - } - private static JavaRDD propagateOnResult(SparkSession spark, String table) { String query; query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " +