From db27663750be42ee9d0a8bca12749bf3a5d66a1f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 11 Jun 2020 10:49:01 +0200 Subject: [PATCH] - --- .../oa/graph/dump/QueryInformationSystem.java | 38 ++------------- .../dump/SparkDumpCommunityProducts.java | 46 +++++++++++++++++-- .../dhp/oa/graph/dump/DumpJobTest.java | 10 +++- 3 files changed, 53 insertions(+), 41 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java index 7245e2021..c3bf65229 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java @@ -1,26 +1,11 @@ package eu.dnetlib.dhp.oa.graph.dump; -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.dom4j.Document; -import org.dom4j.DocumentException; -import org.dom4j.Element; -import org.dom4j.Node; -import org.dom4j.io.SAXReader; -import java.io.StringReader; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class QueryInformationSystem { private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + @@ -33,27 +18,10 @@ public class QueryInformationSystem { - public static Map getCommunityMap(final String isLookupUrl) - throws ISLookUpException, DocumentException { + public List getCommunityMap(final String isLookupUrl) + throws ISLookUpException { ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); - final List res = isLookUp.quickSearchProfile(XQUERY); - - final Map communityMap = new HashMap<>(); - - res.stream().forEach(xml -> { - final Document doc; - try { - doc = new SAXReader().read(new StringReader(xml)); - Element root = doc.getRootElement(); - communityMap.put(root.attribute("id").getValue(), root.attribute("label").getValue()); - } catch (DocumentException e) { - e.printStackTrace(); - } - - - }); - - return communityMap; + return isLookUp.quickSearchProfile(XQUERY); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java index 17bf66ee3..864386877 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java @@ -3,29 +3,35 @@ package eu.dnetlib.dhp.oa.graph.dump; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Result; +import javax.management.Query; public class SparkDumpCommunityProducts implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -52,7 +58,7 @@ public class SparkDumpCommunityProducts implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String dumpClassName = parser.get("dumpClassName"); + final String dumpClassName = parser.get("dumpTableName"); log.info("dumpClassName: {}", dumpClassName); final String isLookUpUrl = parser.get("isLookUpUrl"); @@ -65,10 +71,19 @@ public class SparkDumpCommunityProducts implements Serializable { Class dumpClazz = (Class) Class.forName(dumpClassName); + SparkDumpCommunityProducts sdcp = new SparkDumpCommunityProducts(isLookUpUrl, isSparkSessionManaged, outputPath, + inputPath, inputClazz, dumpClazz); + + + } + + public + SparkDumpCommunityProducts(String isLookUpUrl, Boolean isSparkSessionManaged, String outputPath, String inputPath, + Class inputClazz, Class dumpClazz) throws ISLookUpException { SparkConf conf = new SparkConf(); Map - communityMap = QueryInformationSystem.getCommunityMap(isLookUpUrl); + communityMap = getCommunityMap(isLookUpUrl); runWithSparkSession( @@ -76,11 +91,32 @@ public class SparkDumpCommunityProducts implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - execDump(spark, inputPath, outputPath + "/" + resultType, communityMap, inputClazz, dumpClazz); + execDump(spark, inputPath, outputPath , communityMap, inputClazz, dumpClazz); }); } - private static void execDump( + public Map getCommunityMap(String isLookUpUrl) throws ISLookUpException { + final Map map = new HashMap<>(); + QueryInformationSystem qis = new QueryInformationSystem(); + List communityMap = qis.getCommunityMap(isLookUpUrl); + communityMap.stream().forEach(xml -> { + final Document doc; + try { + doc = new SAXReader().read(new StringReader(xml)); + Element root = doc.getRootElement(); + map.put(root.attribute("id").getValue(), root.attribute("label").getValue()); + } catch (DocumentException e) { + e.printStackTrace(); + } + + + }); + + return map; + } + + + private void execDump( SparkSession spark, String inputPath, String outputPath, diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java index 73c302b06..0852e4b0d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java @@ -47,7 +47,7 @@ public class DumpJobTest { private static Path workingDir; - private static String MOCK_IS_LOOK_UP_URL = "https://beta.services.openaire.eu/is/services/isLookUp"; + private static String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp"; private static final Logger log = LoggerFactory.getLogger(DumpJobTest.class); @@ -92,6 +92,12 @@ public class DumpJobTest { @Mock private SparkDumpCommunityProducts dumpCommunityProducts; + @Mock + private QueryInformationSystem queryInformationSystem; + + @Mock + private ISLookUpService isLookUpService; + private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + @@ -126,6 +132,8 @@ public class DumpJobTest { @BeforeEach public void setUp() throws ISLookUpException { lenient().when(dumpCommunityProducts.getCommunityMap(MOCK_IS_LOOK_UP_URL)).thenReturn(map); + lenient().when(queryInformationSystem.getCommunityMap(MOCK_IS_LOOK_UP_URL)).thenReturn(communityMap); + lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap); }