diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java index 8643868771..e7dba7595e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java @@ -3,22 +3,19 @@ package eu.dnetlib.dhp.oa.graph.dump; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.dom4j.Document; -import org.dom4j.DocumentException; -import org.dom4j.Element; -import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +28,8 @@ import javax.management.Query; public class SparkDumpCommunityProducts implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); + private QueryInformationSystem queryInformationSystem; + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -67,25 +66,37 @@ public class SparkDumpCommunityProducts implements Serializable { final String resultType = parser.get("resultType"); log.info("resultType: {}", resultType); + + SparkDumpCommunityProducts sdcp = new SparkDumpCommunityProducts(); + + sdcp.exec(isLookUpUrl, isSparkSessionManaged, outputPath, + inputPath, resultClassName, dumpClassName); + + } + + public QueryInformationSystem getQueryInformationSystem() { + return queryInformationSystem; + } + + public void setQueryInformationSystem(QueryInformationSystem queryInformationSystem) { + this.queryInformationSystem = queryInformationSystem; + } + + public ISLookUpService getIsLookUpService(String isLookUpUrl){ + return ISLookupClientFactory.getLookUpService(isLookUpUrl); + } + + public void exec(String isLookUpUrl, Boolean isSparkSessionManaged, String outputPath, String inputPath, + String resultClassName, String dumpClassName) throws ISLookUpException, ClassNotFoundException { + SparkConf conf = new SparkConf(); + Class inputClazz = (Class) Class.forName(resultClassName); Class dumpClazz = (Class) Class.forName(dumpClassName); - SparkDumpCommunityProducts sdcp = new SparkDumpCommunityProducts(isLookUpUrl, isSparkSessionManaged, outputPath, - inputPath, inputClazz, dumpClazz); - - - } - - public - SparkDumpCommunityProducts(String isLookUpUrl, Boolean isSparkSessionManaged, String outputPath, String inputPath, - Class inputClazz, Class dumpClazz) throws ISLookUpException { - SparkConf conf = new SparkConf(); - + queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); Map - communityMap = getCommunityMap(isLookUpUrl); - - + communityMap = queryInformationSystem.getCommunityMap(); runWithSparkSession( conf, isSparkSessionManaged, @@ -95,26 +106,6 @@ public class SparkDumpCommunityProducts implements Serializable { }); } - public Map getCommunityMap(String isLookUpUrl) throws ISLookUpException { - final Map map = new HashMap<>(); - QueryInformationSystem qis = new QueryInformationSystem(); - List communityMap = qis.getCommunityMap(isLookUpUrl); - communityMap.stream().forEach(xml -> { - final Document doc; - try { - doc = new SAXReader().read(new StringReader(xml)); - Element root = doc.getRootElement(); - map.put(root.attribute("id").getValue(), root.attribute("label").getValue()); - } catch (DocumentException e) { - e.printStackTrace(); - } - - - }); - - return map; - } - private void execDump( SparkSession spark,