From 074e9ab75ede1e139ef2c41411e4fb37b100763a Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 29 Jul 2020 17:42:50 +0200 Subject: [PATCH] refactoring --- .../dhp/oa/graph/dump/ResultMapper.java | 4 +- .../graph/dump/community/CommunitySplit.java | 109 +++---- .../community/SparkPrepareResultProject.java | 2 +- .../community/SparkSplitForCommunity.java | 6 - .../community/SparkUpdateProjectInfo.java | 1 - .../dhp/oa/graph/dump/graph/Constants.java | 2 +- .../dump/graph/CreateContextEntities.java | 3 - .../dump/graph/CreateContextRelation.java | 11 +- .../graph/dump/graph/DumpGraphEntities.java | 236 ++++++++------ .../dhp/oa/graph/dump/graph/Extractor.java | 299 +++++++++--------- .../dhp/oa/graph/dump/graph/Process.java | 101 +++--- .../dump/graph/QueryInformationSystem.java | 12 +- .../graph/dump/graph/SparkCollectAndSave.java | 110 +++---- .../dump/graph/SparkDumpRelationJob.java | 4 +- .../SparkExtractRelationFromEntities.java | 5 +- .../dump/graph/SparkOrganizationRelation.java | 218 +++++++------ 16 files changed, 579 insertions(+), 544 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index f25087fc99..8db71167ad 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -396,8 +396,8 @@ public class ResultMapper implements Serializable { if (contextList.size() > 0) { Set hashValue = new HashSet<>(); List remainigContext = new ArrayList<>(); - contextList.forEach(c ->{ - if(!hashValue.contains(c.hashCode())){ + contextList.forEach(c -> { + if (!hashValue.contains(c.hashCode())) { remainigContext.add(c); hashValue.add(c.hashCode()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java index b05c6646cf..f0fb163946 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplit.java @@ -1,11 +1,7 @@ + package eu.dnetlib.dhp.oa.graph.dump.community; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.Objects; @@ -13,64 +9,69 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; public class CommunitySplit implements Serializable { + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap) { + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execSplit(spark, inputPath, outputPath, communityMap.keySet());// , inputClazz); + }); + } - public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap) { - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - execSplit(spark, inputPath, outputPath, communityMap.keySet());// , inputClazz); - }); - } + private static void execSplit(SparkSession spark, String inputPath, String outputPath, + Set communities) {// }, Class inputClazz) { - private static void execSplit(SparkSession spark, String inputPath, String outputPath, - Set communities) {// }, Class inputClazz) { + Dataset result = Utils + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); - Dataset result = Utils - .readPath(spark, inputPath + "/publication", CommunityResult.class) - .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); + communities + .stream() + .forEach(c -> printResult(c, result, outputPath)); - communities - .stream() - .forEach(c -> printResult(c, result, outputPath)); + } - } + private static void printResult(String c, Dataset result, String outputPath) { + Dataset community_products = result + .filter(r -> containsCommunity(r, c)); - private static void printResult(String c, Dataset result, String outputPath) { - Dataset community_products = result - .filter(r -> containsCommunity(r, c)); + try { + community_products.first(); + community_products + .repartition(1) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/" + c); + } catch (Exception e) { - try{ - community_products.first(); - community_products - .repartition(1) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath + "/" + c); - }catch(Exception e){ + } - } + } - } - - private static boolean containsCommunity(CommunityResult r, String c) { - if (Optional.ofNullable(r.getContext()).isPresent()) { - return r - .getContext() - .stream() - .filter(con -> con.getCode().equals(c)) - .collect(Collectors.toList()) - .size() > 0; - } - return false; - } + private static boolean containsCommunity(CommunityResult r, String c) { + if (Optional.ofNullable(r.getContext()).isPresent()) { + return r + .getContext() + .stream() + .filter(con -> con.getCode().equals(c)) + .collect(Collectors.toList()) + .size() > 0; + } + return false; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java index 81fd090713..a9d44d9c10 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java @@ -25,8 +25,8 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; import eu.dnetlib.dhp.schema.dump.oaf.community.Project; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java index 41c47c5edd..9c64efabb6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java @@ -64,16 +64,10 @@ public class SparkSplitForCommunity implements Serializable { split.run(isSparkSessionManaged, inputPath, outputPath, communityMap); - - - } public static ISLookUpService getIsLookUpService(String isLookUpUrl) { return ISLookupClientFactory.getLookUpService(isLookUpUrl); } - - - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java index 506601f14f..1276d8495e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; - import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; public class SparkUpdateProjectInfo implements Serializable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java index c919ecd972..7c4d03c1e2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java @@ -25,6 +25,6 @@ public class Constants implements Serializable { public static final String CONTEXT_ID = "00"; public static final String CONTEXT_NS_PREFIX = "context____"; - public static final String HARVESTED = "Harvested"; + public static final String HARVESTED = "Harvested"; public static final String DEFAULT_TRUST = "0.9"; } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java index 80d47cc4e6..38edd2c167 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java @@ -46,7 +46,6 @@ public class CreateContextEntities implements Serializable { final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); - final String hdfsPath = parser.get("hdfsPath"); log.info("hdfsPath: {}", hdfsPath); @@ -90,8 +89,6 @@ public class CreateContextEntities implements Serializable { queryInformationSystem.getContextInformation(consumer); } - - protected void writeEntity(final R r) { try { writer.write(Utils.OBJECT_MAPPER.writeValueAsString(r)); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java index 2067499b61..871645ee34 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java @@ -11,7 +11,6 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; -import eu.dnetlib.dhp.schema.oaf.Datasource; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -24,6 +23,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.graph.*; +import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; public class CreateContextRelation implements Serializable { @@ -66,7 +66,10 @@ public class CreateContextRelation implements Serializable { cce.execute(Process::getRelation, CONTEX_RELATION_DATASOURCE, ModelSupport.getIdPrefix(Datasource.class)); log.info("Creating relations for projects... "); - cce.execute(Process::getRelation, CONTEX_RELATION_PROJECT, ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class)); + cce + .execute( + Process::getRelation, CONTEX_RELATION_PROJECT, + ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class)); } @@ -92,14 +95,14 @@ public class CreateContextRelation implements Serializable { } - public void execute(final Function> producer, String category, String prefix) throws Exception { + public void execute(final Function> producer, String category, String prefix) + throws Exception { final Consumer consumer = ci -> producer.apply(ci).forEach(c -> writeEntity(c)); queryInformationSystem.getContextRelation(consumer, category, prefix); } - protected void writeEntity(final Relation r) { try { writer.write(Utils.OBJECT_MAPPER.writeValueAsString(r)); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index 296d5e02a8..8b11b259ce 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -11,8 +11,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.dump.oaf.*; -import eu.dnetlib.dhp.schema.oaf.Journal; +import javax.swing.text.html.Option; + import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -26,12 +26,12 @@ import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.*; import eu.dnetlib.dhp.schema.dump.oaf.graph.*; import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.OafEntity; -import javax.swing.text.html.Option; - public class DumpGraphEntities implements Serializable { public void run(Boolean isSparkSessionManaged, @@ -68,30 +68,29 @@ public class DumpGraphEntities implements Serializable { break; case "10": runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - datasourceMap(spark, inputPath, outputPath, inputClazz); + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + datasourceMap(spark, inputPath, outputPath, inputClazz); - }); + }); break; } } - private static void datasourceMap(SparkSession spark, String inputPath, String outputPath, Class inputClazz) { + private static void datasourceMap(SparkSession spark, String inputPath, String outputPath, + Class inputClazz) { Utils - .readPath(spark, inputPath, inputClazz) - .map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + .readPath(spark, inputPath, inputClazz) + .map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } - - private static void projectMap(SparkSession spark, String inputPath, String outputPath, Class inputClazz) { Utils @@ -110,84 +109,120 @@ public class DumpGraphEntities implements Serializable { Optional.ofNullable(d.getOriginalId()).ifPresent(oId -> datasource.setOriginalId(oId)); - Optional.ofNullable(d.getPid()) - .ifPresent(pids -> pids.stream().map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) - .collect(Collectors.toList())); + Optional + .ofNullable(d.getPid()) + .ifPresent( + pids -> pids + .stream() + .map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())); - Optional.ofNullable(d.getDatasourcetype()) - .ifPresent(dsType -> datasource.setDatasourcetype(ControlledField.newInstance(dsType.getClassid(), dsType.getClassname()))); + Optional + .ofNullable(d.getDatasourcetype()) + .ifPresent( + dsType -> datasource + .setDatasourcetype(ControlledField.newInstance(dsType.getClassid(), dsType.getClassname()))); - Optional.ofNullable(d.getOpenairecompatibility()) - .ifPresent(v -> datasource.setOpenairecompatibility(v.getClassname())); + Optional + .ofNullable(d.getOpenairecompatibility()) + .ifPresent(v -> datasource.setOpenairecompatibility(v.getClassname())); - Optional.ofNullable(d.getOfficialname()) - .ifPresent(oname -> datasource.setOfficialname(oname.getValue())); + Optional + .ofNullable(d.getOfficialname()) + .ifPresent(oname -> datasource.setOfficialname(oname.getValue())); - Optional.ofNullable(d.getEnglishname()) - .ifPresent(ename -> datasource.setEnglishname(ename.getValue())); + Optional + .ofNullable(d.getEnglishname()) + .ifPresent(ename -> datasource.setEnglishname(ename.getValue())); - Optional.ofNullable(d.getWebsiteurl()) - .ifPresent(wsite -> datasource.setWebsiteurl(wsite.getValue())); + Optional + .ofNullable(d.getWebsiteurl()) + .ifPresent(wsite -> datasource.setWebsiteurl(wsite.getValue())); - Optional.ofNullable(d.getLogourl()) - .ifPresent(lurl -> datasource.setLogourl(lurl.getValue())); + Optional + .ofNullable(d.getLogourl()) + .ifPresent(lurl -> datasource.setLogourl(lurl.getValue())); - Optional.ofNullable(d.getDateofvalidation()) - .ifPresent(dval -> datasource.setDateofvalidation(dval.getValue())); + Optional + .ofNullable(d.getDateofvalidation()) + .ifPresent(dval -> datasource.setDateofvalidation(dval.getValue())); - Optional.ofNullable(d.getDescription()) - .ifPresent(dex -> datasource.setDescription(dex.getValue())); + Optional + .ofNullable(d.getDescription()) + .ifPresent(dex -> datasource.setDescription(dex.getValue())); - Optional.ofNullable(d.getSubjects()) - .ifPresent(sbjs -> datasource.setSubjects(sbjs.stream().map(sbj -> sbj.getValue()).collect(Collectors.toList()))); + Optional + .ofNullable(d.getSubjects()) + .ifPresent( + sbjs -> datasource.setSubjects(sbjs.stream().map(sbj -> sbj.getValue()).collect(Collectors.toList()))); - Optional.ofNullable(d.getOdpolicies()) - .ifPresent(odp->datasource.setPolicies(Arrays.asList(odp.getValue()))); + Optional + .ofNullable(d.getOdpolicies()) + .ifPresent(odp -> datasource.setPolicies(Arrays.asList(odp.getValue()))); - Optional.ofNullable(d.getOdlanguages()) - .ifPresent(langs -> datasource.setLanguages(langs.stream().map(lang -> lang.getValue()).collect(Collectors.toList()))); + Optional + .ofNullable(d.getOdlanguages()) + .ifPresent( + langs -> datasource + .setLanguages(langs.stream().map(lang -> lang.getValue()).collect(Collectors.toList()))); - Optional.ofNullable(d.getOdcontenttypes()) - .ifPresent(ctypes -> datasource.setContenttypes(ctypes.stream().map(ctype -> ctype.getValue()).collect(Collectors.toList()))); + Optional + .ofNullable(d.getOdcontenttypes()) + .ifPresent( + ctypes -> datasource + .setContenttypes(ctypes.stream().map(ctype -> ctype.getValue()).collect(Collectors.toList()))); - Optional.ofNullable(d.getReleasestartdate()) - .ifPresent(rd -> datasource.setReleasestartdate(rd.getValue())); + Optional + .ofNullable(d.getReleasestartdate()) + .ifPresent(rd -> datasource.setReleasestartdate(rd.getValue())); - Optional.ofNullable(d.getReleaseenddate()) - .ifPresent(ed -> datasource.setReleaseenddate(ed.getValue())); + Optional + .ofNullable(d.getReleaseenddate()) + .ifPresent(ed -> datasource.setReleaseenddate(ed.getValue())); - Optional.ofNullable(d.getMissionstatementurl()) - .ifPresent(ms -> datasource.setMissionstatementurl(ms.getValue())); + Optional + .ofNullable(d.getMissionstatementurl()) + .ifPresent(ms -> datasource.setMissionstatementurl(ms.getValue())); - Optional.ofNullable(d.getDatabaseaccesstype()) - .ifPresent(ar -> datasource.setAccessrights(ar.getValue())); + Optional + .ofNullable(d.getDatabaseaccesstype()) + .ifPresent(ar -> datasource.setAccessrights(ar.getValue())); - Optional.ofNullable(d.getDatauploadtype()) - .ifPresent(dut -> datasource.setUploadrights(dut.getValue())); + Optional + .ofNullable(d.getDatauploadtype()) + .ifPresent(dut -> datasource.setUploadrights(dut.getValue())); - Optional.ofNullable(d.getDatabaseaccessrestriction()) - .ifPresent(dar ->datasource.setDatabaseaccessrestriction(dar.getValue())); + Optional + .ofNullable(d.getDatabaseaccessrestriction()) + .ifPresent(dar -> datasource.setDatabaseaccessrestriction(dar.getValue())); - Optional.ofNullable(d.getDatauploadrestriction()) - .ifPresent(dur -> datasource.setDatauploadrestriction(dur.getValue())); + Optional + .ofNullable(d.getDatauploadrestriction()) + .ifPresent(dur -> datasource.setDatauploadrestriction(dur.getValue())); - Optional.ofNullable(d.getVersioning()) - .ifPresent(v->datasource.setVersioning(v.getValue())); + Optional + .ofNullable(d.getVersioning()) + .ifPresent(v -> datasource.setVersioning(v.getValue())); - Optional.ofNullable(d.getCitationguidelineurl()) - .ifPresent(cu -> datasource.setCitationguidelineurl(cu.getValue())); + Optional + .ofNullable(d.getCitationguidelineurl()) + .ifPresent(cu -> datasource.setCitationguidelineurl(cu.getValue())); - Optional.ofNullable(d.getPidsystems()) - .ifPresent(ps -> datasource.setPidsystems(ps.getValue())); + Optional + .ofNullable(d.getPidsystems()) + .ifPresent(ps -> datasource.setPidsystems(ps.getValue())); - Optional.ofNullable(d.getCertificates()) - .ifPresent(c -> datasource.setCertificates(c.getValue())); + Optional + .ofNullable(d.getCertificates()) + .ifPresent(c -> datasource.setCertificates(c.getValue())); - Optional.ofNullable(d.getPolicies()) - .ifPresent(ps -> datasource.setPolicies(ps.stream().map(p -> p.getValue()).collect(Collectors.toList()))); + Optional + .ofNullable(d.getPolicies()) + .ifPresent(ps -> datasource.setPolicies(ps.stream().map(p -> p.getValue()).collect(Collectors.toList()))); - Optional.ofNullable(d.getJournal()) - .ifPresent(j -> datasource.setJournal(getContainer(j))); + Optional + .ofNullable(d.getJournal()) + .ifPresent(j -> datasource.setJournal(getContainer(j))); return datasource; @@ -196,38 +231,49 @@ public class DumpGraphEntities implements Serializable { private static Container getContainer(Journal j) { Container c = new Container(); - Optional.ofNullable(j.getName()) - .ifPresent(n->c.setName(n)); + Optional + .ofNullable(j.getName()) + .ifPresent(n -> c.setName(n)); - Optional.ofNullable(j.getIssnPrinted()) - .ifPresent(issnp -> c.setIssnPrinted(issnp)); + Optional + .ofNullable(j.getIssnPrinted()) + .ifPresent(issnp -> c.setIssnPrinted(issnp)); - Optional.ofNullable(j.getIssnOnline()) - .ifPresent(issno -> c.setIssnOnline(issno)); + Optional + .ofNullable(j.getIssnOnline()) + .ifPresent(issno -> c.setIssnOnline(issno)); - Optional.ofNullable(j.getIssnLinking()) - .ifPresent(isnl -> c.setIssnLinking(isnl)); + Optional + .ofNullable(j.getIssnLinking()) + .ifPresent(isnl -> c.setIssnLinking(isnl)); - Optional.ofNullable(j.getEp()) - .ifPresent(ep -> c.setEp(ep)); + Optional + .ofNullable(j.getEp()) + .ifPresent(ep -> c.setEp(ep)); - Optional.ofNullable(j.getIss()) - .ifPresent(iss -> c.setIss(iss)); + Optional + .ofNullable(j.getIss()) + .ifPresent(iss -> c.setIss(iss)); - Optional.ofNullable(j.getSp()) - .ifPresent(sp -> c.setSp(sp)); + Optional + .ofNullable(j.getSp()) + .ifPresent(sp -> c.setSp(sp)); - Optional.ofNullable(j.getVol()) - .ifPresent(vol -> c.setVol(vol)); + Optional + .ofNullable(j.getVol()) + .ifPresent(vol -> c.setVol(vol)); - Optional.ofNullable(j.getEdition()) - .ifPresent(edition -> c.setEdition(edition)); + Optional + .ofNullable(j.getEdition()) + .ifPresent(edition -> c.setEdition(edition)); - Optional.ofNullable(j.getConferencedate()) - .ifPresent(cdate -> c.setConferencedate(cdate)); + Optional + .ofNullable(j.getConferencedate()) + .ifPresent(cdate -> c.setConferencedate(cdate)); - Optional.ofNullable(j.getConferenceplace()) - .ifPresent(cplace -> c.setConferenceplace(cplace)); + Optional + .ofNullable(j.getConferenceplace()) + .ifPresent(cplace -> c.setConferenceplace(cplace)); return c; } @@ -466,8 +512,6 @@ public class DumpGraphEntities implements Serializable { .map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) .collect(Collectors.toList()))); - - return organization; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java index ae77f7dfcf..ee8734f934 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java @@ -1,5 +1,17 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; @@ -13,37 +25,25 @@ import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; - -import java.io.Serializable; -import java.util.*; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class Extractor implements Serializable { - public void run(Boolean isSparkSessionManaged, - String inputPath, - String outputPath, - Class inputClazz, - CommunityMap communityMap) { - - SparkConf conf = new SparkConf(); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - extractRelationResult(spark, inputPath, outputPath, inputClazz, communityMap); - }); - } + public void run(Boolean isSparkSessionManaged, + String inputPath, + String outputPath, + Class inputClazz, + CommunityMap communityMap) { + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + extractRelationResult(spark, inputPath, outputPath, inputClazz, communityMap); + }); + } // private static void extractRelationProjects(SparkSession spark, String inputPath, String outputPath){ // Utils.readPath(spark, inputPath, Project.class) @@ -70,138 +70,139 @@ public class Extractor implements Serializable { // .json(outputPath); // } + private void extractRelationResult(SparkSession spark, + String inputPath, + String outputPath, + Class inputClazz, + CommunityMap communityMap) { - private void extractRelationResult(SparkSession spark, - String inputPath, - String outputPath, - Class inputClazz, - CommunityMap communityMap) { + Set hashCodes = new HashSet<>(); - Set hashCodes = new HashSet<>(); + Utils + .readPath(spark, inputPath, inputClazz) + .flatMap((FlatMapFunction) value -> { + List relationList = new ArrayList<>(); + Optional + .ofNullable(value.getInstance()) + .ifPresent(inst -> inst.forEach(instance -> { + Optional + .ofNullable(instance.getCollectedfrom()) + .ifPresent( + cf -> getRelatioPair( + value, relationList, cf, + ModelConstants.IS_PROVIDED_BY, ModelConstants.PROVIDES, hashCodes)); + Optional + .ofNullable(instance.getHostedby()) + .ifPresent( + hb -> getRelatioPair( + value, relationList, hb, + Constants.IS_HOSTED_BY, Constants.HOSTS, hashCodes)); + })); + Set communities = communityMap.keySet(); + Optional + .ofNullable(value.getContext()) + .ifPresent(contexts -> contexts.forEach(context -> { + String id = context.getId(); + if (id.contains(":")) { + id = id.substring(0, id.indexOf(":")); + } + if (communities.contains(id)) { + String contextId = Utils.getContextId(id); + Provenance provenance = Optional + .ofNullable(context.getDataInfo()) + .map( + dinfo -> Optional + .ofNullable(dinfo.get(0).getProvenanceaction()) + .map( + paction -> Provenance + .newInstance( + paction.getClassid(), + dinfo.get(0).getTrust())) + .orElse(null)) + .orElse(null); + Relation r = getRelation( + value.getId(), contextId, + Constants.RESULT_ENTITY, + Constants.CONTEXT_ENTITY, + ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, provenance); + if (!hashCodes.contains(r.hashCode())) { + relationList + .add(r); + hashCodes.add(r.hashCode()); + } + r = getRelation( + contextId, value.getId(), + Constants.CONTEXT_ENTITY, + Constants.RESULT_ENTITY, + ModelConstants.RELATIONSHIP, + ModelConstants.IS_RELATED_TO, provenance); + if (!hashCodes.contains(r.hashCode())) { + relationList + .add( + r); + hashCodes.add(r.hashCode()); + } - Utils - .readPath(spark, inputPath, inputClazz) - .flatMap((FlatMapFunction) value -> { - List relationList = new ArrayList<>(); - Optional - .ofNullable(value.getInstance()) - .ifPresent(inst -> inst.forEach(instance -> { - Optional - .ofNullable(instance.getCollectedfrom()) - .ifPresent(cf -> - getRelatioPair(value, relationList, cf, - ModelConstants.IS_PROVIDED_BY, ModelConstants.PROVIDES, hashCodes) - ); - Optional - .ofNullable(instance.getHostedby()) - .ifPresent(hb -> getRelatioPair(value, relationList, hb, - Constants.IS_HOSTED_BY, Constants.HOSTS , hashCodes) ); - })); - Set communities = communityMap.keySet(); - Optional - .ofNullable(value.getContext()) - .ifPresent(contexts -> contexts.forEach(context -> { - String id = context.getId(); - if (id.contains(":")) { - id = id.substring(0, id.indexOf(":")); - } - if (communities.contains(id)) { - String contextId = Utils.getContextId(id); - Provenance provenance = Optional - .ofNullable(context.getDataInfo()) - .map( - dinfo -> Optional - .ofNullable(dinfo.get(0).getProvenanceaction()) - .map( - paction -> Provenance - .newInstance( - paction.getClassid(), - dinfo.get(0).getTrust())) - .orElse(null)) - .orElse(null); - Relation r = getRelation( - value.getId(), contextId, - Constants.RESULT_ENTITY, - Constants.CONTEXT_ENTITY, - ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, provenance); - if(!hashCodes.contains(r.hashCode())){ - relationList - .add(r); - hashCodes.add(r.hashCode()); - } - r = getRelation( - contextId, value.getId(), - Constants.CONTEXT_ENTITY, - Constants.RESULT_ENTITY, - ModelConstants.RELATIONSHIP, - ModelConstants.IS_RELATED_TO, provenance); - if(!hashCodes.contains(r.hashCode())){ - relationList - .add( - r); - hashCodes.add(r.hashCode()); - } + } - } + })); - })); + return relationList.iterator(); + }, Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); - return relationList.iterator(); - }, Encoders.bean(Relation.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); + } - } + private static void getRelatioPair(R value, List relationList, KeyValue cf, + String result_dtasource, String datasource_result, + Set hashCodes) { + Provenance provenance = Optional + .ofNullable(cf.getDataInfo()) + .map( + dinfo -> Optional + .ofNullable(dinfo.getProvenanceaction()) + .map( + paction -> Provenance + .newInstance( + paction.getClassid(), + dinfo.getTrust())) + .orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST))) + .orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST)); + Relation r = getRelation( + value.getId(), + cf.getKey(), Constants.RESULT_ENTITY, Constants.DATASOURCE_ENTITY, + result_dtasource, ModelConstants.PROVISION, + provenance); + if (!hashCodes.contains(r.hashCode())) { + relationList + .add(r); + hashCodes.add(r.hashCode()); + } - private static void getRelatioPair(R value, List relationList, KeyValue cf, - String result_dtasource, String datasource_result, - Set hashCodes) { - Provenance provenance = Optional - .ofNullable(cf.getDataInfo()) - .map( - dinfo -> Optional - .ofNullable(dinfo.getProvenanceaction()) - .map( - paction -> Provenance - .newInstance( - paction.getClassid(), - dinfo.getTrust())) - .orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST))) - .orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST)); - Relation r = getRelation( - value.getId(), - cf.getKey(), Constants.RESULT_ENTITY, Constants.DATASOURCE_ENTITY, - result_dtasource, ModelConstants.PROVISION, - provenance); - if(!hashCodes.contains(r.hashCode())){ - relationList - .add(r); - hashCodes.add(r.hashCode()); - } + r = getRelation( + cf.getKey(), value.getId(), + Constants.DATASOURCE_ENTITY, Constants.RESULT_ENTITY, + datasource_result, ModelConstants.PROVISION, + provenance); - r = getRelation( - cf.getKey(), value.getId(), - Constants.DATASOURCE_ENTITY, Constants.RESULT_ENTITY, - datasource_result, ModelConstants.PROVISION, - provenance); + if (!hashCodes.contains(r.hashCode())) { + relationList + .add(r); + hashCodes.add(r.hashCode()); + } - if(!hashCodes.contains(r.hashCode())){ - relationList - .add(r); - hashCodes.add(r.hashCode()); - } + } - } - - private static Relation getRelation(String source, String target, String sourceType, String targetType, - String relName, String relType, Provenance provenance) { - Relation r = new Relation(); - r.setSource(Node.newInstance(source, sourceType)); - r.setTarget(Node.newInstance(target, targetType)); - r.setReltype(RelType.newInstance(relName, relType)); - r.setProvenance(provenance); - return r; - } + private static Relation getRelation(String source, String target, String sourceType, String targetType, + String relName, String relType, Provenance provenance) { + Relation r = new Relation(); + r.setSource(Node.newInstance(source, sourceType)); + r.setTarget(Node.newInstance(target, targetType)); + r.setReltype(RelType.newInstance(relName, relType)); + r.setProvenance(provenance); + return r; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java index e4ce936730..2ae7e358a2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java @@ -1,5 +1,10 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + import eu.dnetlib.dhp.oa.graph.dump.Constants; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -7,65 +12,61 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.Provenance; import eu.dnetlib.dhp.schema.dump.oaf.graph.*; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - public class Process implements Serializable { - public static R getEntity(ContextInfo ci) { - try { - ResearchInitiative ri; - if (ci.getType().equals("community")) { - ri = new ResearchCommunity(); - ((ResearchCommunity) ri).setSubject(ci.getSubject()); - ri.setType(Constants.RESEARCH_COMMUNITY); - } else { - ri = new ResearchInitiative(); - ri.setType(Constants.RESEARCH_INFRASTRUCTURE); - } - ri.setId(Utils.getContextId(ci.getId())); - ri.setOriginalId(ci.getId()); + public static R getEntity(ContextInfo ci) { + try { + ResearchInitiative ri; + if (ci.getType().equals("community")) { + ri = new ResearchCommunity(); + ((ResearchCommunity) ri).setSubject(ci.getSubject()); + ri.setType(Constants.RESEARCH_COMMUNITY); + } else { + ri = new ResearchInitiative(); + ri.setType(Constants.RESEARCH_INFRASTRUCTURE); + } + ri.setId(Utils.getContextId(ci.getId())); + ri.setOriginalId(ci.getId()); - ri.setDescription(ci.getDescription()); - ri.setName(ci.getName()); - ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity()); - return (R) ri; + ri.setDescription(ci.getDescription()); + ri.setName(ci.getName()); + ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity()); + return (R) ri; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } - public static List getRelation(ContextInfo ci) { - try { + public static List getRelation(ContextInfo ci) { + try { - List relationList = new ArrayList<>(); - ci - .getDatasourceList() - .forEach(ds -> { - Relation direct = new Relation(); - Relation inverse = new Relation(); - String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2)); - direct.setSource(Node.newInstance(Utils.getContextId(ci.getId()), "context")); - direct.setTarget(Node.newInstance(ds, nodeType)); - direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); - direct.setProvenance(Provenance.newInstance("Harvested", "09")); - relationList.add(direct); + List relationList = new ArrayList<>(); + ci + .getDatasourceList() + .forEach(ds -> { + Relation direct = new Relation(); + Relation inverse = new Relation(); + String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2)); + direct.setSource(Node.newInstance(Utils.getContextId(ci.getId()), "context")); + direct.setTarget(Node.newInstance(ds, nodeType)); + direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + direct.setProvenance(Provenance.newInstance("Harvested", "09")); + relationList.add(direct); - inverse.setTarget(Node.newInstance(Utils.getContextId(ci.getId()), "context")); - inverse.setSource(Node.newInstance(ds, nodeType)); - inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); - inverse.setProvenance(Provenance.newInstance("Harvested", "09")); - relationList.add(inverse); + inverse.setTarget(Node.newInstance(Utils.getContextId(ci.getId()), "context")); + inverse.setSource(Node.newInstance(ds, nodeType)); + inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + inverse.setProvenance(Provenance.newInstance("Harvested", "09")); + relationList.add(inverse); - }); + }); - return relationList; + return relationList; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java index 43c6160fb8..e75201b0b7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java @@ -9,7 +9,6 @@ import java.util.*; import java.util.function.Consumer; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelSupport; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -17,6 +16,7 @@ import org.dom4j.Node; import org.dom4j.io.SAXReader; import org.jetbrains.annotations.NotNull; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -52,7 +52,7 @@ public class QueryInformationSystem { cinfo.setId(cSplit[0]); cinfo.setName(cSplit[1]); cinfo.setDescription(cSplit[2]); - if (!cSplit[3].trim().equals("")){ + if (!cSplit[3].trim().equals("")) { cinfo.setSubject(Arrays.asList(cSplit[3].split(","))); } cinfo.setZenodocommunity(cSplit[4]); @@ -98,7 +98,7 @@ public class QueryInformationSystem { Iterator it = root.elementIterator(); while (it.hasNext()) { Element el = (Element) it.next(); - if(el.getName().equals("category")){ + if (el.getName().equals("category")) { String categoryId = el.attributeValue("id"); categoryId = categoryId.substring(categoryId.lastIndexOf("::") + 2); if (categoryId.equals(category)) { @@ -119,9 +119,9 @@ public class QueryInformationSystem { @NotNull private List getCategoryList(Element el, String prefix) { List datasourceList = new ArrayList<>(); - for(Object node : el.selectNodes(".//param")){ - Node n = (Node)node; - if(n.valueOf("./@name").equals("openaireId")){ + for (Object node : el.selectNodes(".//param")) { + Node n = (Node) node; + if (n.valueOf("./@name").equals("openaireId")) { datasourceList.add(prefix + "|" + n.getText()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java index c1d500e767..2abb9bbf63 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java @@ -1,13 +1,12 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.Optional; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.Result; -import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.SaveMode; @@ -15,69 +14,72 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; public class SparkCollectAndSave implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkCollectAndSave.class); + private static final Logger log = LoggerFactory.getLogger(SparkCollectAndSave.class); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkCollectAndSave.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkCollectAndSave.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath + "/result"); - run(spark, inputPath, outputPath); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath + "/result"); + run(spark, inputPath, outputPath); - }); + }); + } - } + private static void run(SparkSession spark, String inputPath, String outputPath) { + Utils + .readPath(spark, inputPath + "/result/publication", Result.class) + .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) + .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct", Result.class)) + .union(Utils.readPath(spark, inputPath + "/result/software", Result.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); - private static void run(SparkSession spark, String inputPath, String outputPath) { - Utils.readPath(spark, inputPath + "/result/publication", Result.class) - .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) - .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct" , Result.class)) - .union(Utils.readPath(spark, inputPath + "/result/software", Result.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); + Utils + .readPath(spark, inputPath + "/relation/publication", Relation.class) + .union(Utils.readPath(spark, inputPath + "/relation/dataset", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/orp", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/software", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/contextOrg", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath + "/relation"); - Utils.readPath(spark, inputPath +"/relation/publication", Relation.class) - .union(Utils.readPath(spark, inputPath + "/relation/dataset", Relation.class)) - .union(Utils.readPath(spark, inputPath + "/relation/orp", Relation.class)) - .union(Utils.readPath(spark, inputPath + "/relation/software", Relation.class)) - .union(Utils.readPath(spark, inputPath + "/relation/contextOrg", Relation.class)) - .union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class)) - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .json(outputPath + "/relation"); - - } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java index d167367528..3b9497ccdb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java @@ -29,9 +29,9 @@ public class SparkDumpRelationJob implements Serializable { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - SparkDumpRelationJob.class + SparkDumpRelationJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_whole/input_relationdump_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_relationdump_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java index 14b5732f5d..a580e7de0f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java @@ -1,9 +1,9 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; - import java.io.Serializable; import java.util.*; + import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +14,6 @@ import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.schema.oaf.Result; - public class SparkExtractRelationFromEntities implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkExtractRelationFromEntities.class); @@ -55,8 +54,6 @@ public class SparkExtractRelationFromEntities implements Serializable { Extractor extractor = new Extractor(); extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); - } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java index e018fccdc6..0105fd8710 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java @@ -1,34 +1,7 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -import com.google.gson.Gson; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -import eu.dnetlib.dhp.oa.graph.dump.Utils; - -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.dump.oaf.Provenance; -import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; -import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; -import eu.dnetlib.dhp.schema.oaf.DataInfo; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -import org.apache.avro.generic.GenericData; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.ForeachFunction; - -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.BufferedWriter; import java.io.IOException; @@ -38,108 +11,131 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.atomic.AtomicReference; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import org.apache.avro.generic.GenericData; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; +import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; public class SparkOrganizationRelation implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkOrganizationRelation.class); + private static final Logger log = LoggerFactory.getLogger(SparkOrganizationRelation.class); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkOrganizationRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_whole/input_organization_parameters.json")); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkOrganizationRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_organization_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + final OrganizationMap organizationMap = new Gson() + .fromJson(parser.get("organizationCommunityMap"), OrganizationMap.class); + log.info("organization map : {}", new Gson().toJson(organizationMap)); - final OrganizationMap organizationMap = new Gson().fromJson(parser.get("organizationCommunityMap"), OrganizationMap.class); - log.info("organization map : {}", new Gson().toJson(organizationMap)); + SparkConf conf = new SparkConf(); + AtomicReference> relationSet = null; - SparkConf conf = new SparkConf(); - AtomicReference> relationSet = null; + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeRelations(spark, extractRelation(spark, inputPath, organizationMap), outputPath, organizationMap); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - writeRelations(spark, extractRelation(spark, inputPath, organizationMap), outputPath, organizationMap); + }); - }); + } - } + private static void writeRelations(SparkSession spark, Set rels, String outputPath, + OrganizationMap organizationMap) { - private static void writeRelations(SparkSession spark, Set rels, String outputPath, OrganizationMap organizationMap) { + List relList = new ArrayList<>(); - List relList = new ArrayList<>(); + rels.forEach(oId -> { + organizationMap.get(oId).forEach(community -> { + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation direct = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation inverse = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); + String id = Utils.getContextId(community); + direct.setSource(Node.newInstance(id, "context")); + direct.setTarget(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); + direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + direct.setProvenance(Provenance.newInstance("Harvested", "0.9")); + relList.add(direct); + inverse.setTarget(Node.newInstance(id, "context")); + inverse.setSource(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); + inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + inverse.setProvenance(Provenance.newInstance("Harvested", "0.9")); + relList.add(inverse); - rels.forEach(oId -> { - organizationMap.get(oId).forEach(community -> { - eu.dnetlib.dhp.schema.dump.oaf.graph.Relation direct = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); - eu.dnetlib.dhp.schema.dump.oaf.graph.Relation inverse = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); - String id = Utils.getContextId(community); - direct.setSource(Node.newInstance(id, "context")); - direct.setTarget(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); - direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); - direct.setProvenance(Provenance.newInstance("Harvested", "0.9")); - relList.add(direct); - inverse.setTarget(Node.newInstance(id, "context")); - inverse.setSource(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); - inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); - inverse.setProvenance(Provenance.newInstance("Harvested", "0.9")); - relList.add(inverse); + }); - }); + }); - }); + spark + .createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - spark.createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } + private static Set extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap) { + Dataset tmp = Utils.readPath(spark, inputPath, Relation.class); + Set organizationSet = organizationMap.keySet(); + Set toCreateRels = new HashSet<>(); + tmp.foreach((ForeachFunction) relation -> { + Optional odInfo = Optional.ofNullable(relation.getDataInfo()); + if (odInfo.isPresent()) { + if (!odInfo.get().getDeletedbyinference()) { + if (relation.getRelClass().equals(ModelConstants.MERGES)) { + String oId = relation.getTarget(); + if (organizationSet.contains(oId)) { + organizationSet.remove(oId); + toCreateRels.add(relation.getSource()); + } + } + } + } + }); + toCreateRels.addAll(organizationSet); + return toCreateRels; - private static Set extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap) { - Dataset tmp = Utils.readPath(spark, inputPath, Relation.class); - Set organizationSet = organizationMap.keySet(); - Set toCreateRels = new HashSet<>(); + } - tmp.foreach((ForeachFunction) relation -> { - Optional odInfo = Optional.ofNullable(relation.getDataInfo()); - if (odInfo.isPresent()) { - if (!odInfo.get().getDeletedbyinference()) { - if(relation.getRelClass().equals(ModelConstants.MERGES)){ - String oId = relation.getTarget(); - if (organizationSet.contains(oId)) { - organizationSet.remove(oId); - toCreateRels.add(relation.getSource()); - } - } - } - }}); - - toCreateRels.addAll(organizationSet); - return toCreateRels; - - } - - - - } \ No newline at end of file +}