diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java new file mode 100644 index 000000000..d9e8ced85 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/ReadDatasourceMasterDuplicateFromDB.java @@ -0,0 +1,76 @@ + +package eu.dnetlib.dhp.common.action; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +public class ReadDatasourceMasterDuplicateFromDB { + + private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services;"; + + public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) + throws IOException { + int count = 0; + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(conf); + FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath)); + + log.info("running query: {}", QUERY); + log.info("storing results in: {}", hdfsPath); + + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); + count++; + } + } + return count; + } + + private static MasterDuplicate datasourceMasterMap(ResultSet rs) { + try { + MasterDuplicate md = new MasterDuplicate(); + final String master = rs.getString("master"); + final String duplicate = rs.getString("duplicate"); + md.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); + md.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true)); + + return md; + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(dm)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java new file mode 100644 index 000000000..b3e0d2aaa --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/action/model/MasterDuplicate.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.common.action.model; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 21/07/22 + */ +public class MasterDuplicate implements Serializable { + private String duplicate; + private String master; + + public String getDuplicate() { + return duplicate; + } + + public void setDuplicate(String duplicate) { + this.duplicate = duplicate; + } + + public String getMaster() { + return master; + } + + public void setMaster(String master) { + this.master = master; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java new file mode 100644 index 000000000..8bf36ff82 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/MasterDuplicateAction.java @@ -0,0 +1,45 @@ + +package eu.dnetlib.dhp.oa.graph.clean; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB; + +public class MasterDuplicateAction { + + private static final Logger log = LoggerFactory.getLogger(MasterDuplicateAction.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MasterDuplicateAction.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + log.info("postgresUrl: {}", dbUrl); + + final String dbUser = parser.get("postgresUser"); + log.info("postgresUser: {}", dbUser); + + final String dbPassword = parser.get("postgresPassword"); + log.info("postgresPassword: {}", dbPassword); + + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode: {}", hdfsNameNode); + + int rows = ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode); + + log.info("written {} rows", rows); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java new file mode 100644 index 000000000..ad7e252f6 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -0,0 +1,207 @@ + +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; +import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +public class CleanCfHbSparkJob { + + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CleanCountrySparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String masterDuplicatePath = parser.get("masterDuplicatePath"); + log.info("masterDuplicatePath: {}", masterDuplicatePath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + cleanCfHb( + spark, inputPath, entityClazz, workingPath, masterDuplicatePath, outputPath); + }); + } + + private static void cleanCfHb(SparkSession spark, String inputPath, Class entityClazz, + String workingPath, String masterDuplicatePath, String outputPath) { + // read the master-duplicate tuples + Dataset md = spark + .read() + .textFile(masterDuplicatePath) + .map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class)); + + // read the result table + Dataset res = spark + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); + + // prepare the resolved CF|HB references with the corresponding EMPTY master ID + Dataset resolved = res + .flatMap( + (FlatMapFunction) r -> Stream + .concat( + r.getCollectedfrom().stream().map(KeyValue::getKey), + Stream + .concat( + r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), + r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + .distinct() + .map(s -> asIdCfHbMapping(r.getId(), s)) + .iterator(), + Encoders.bean(IdCfHbMapping.class)); + + final String resolvedPath = workingPath + "/cfHbResolved"; + + // set the EMPTY master ID and save it aside + resolved + .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicate"))) + .map((MapFunction, IdCfHbMapping>) t -> { + t._1().setMaster(t._2().getMaster()); + return t._1(); + }, Encoders.bean(IdCfHbMapping.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(resolvedPath); + + // read again the resolved CF|HB mapping + Dataset resolvedDS = spark + .read() + .load(resolvedPath) + .as(Encoders.bean(IdCfHbMapping.class)); + + // Join the results with the resolved CF|HB mapping, apply the mapping and save it + res + .joinWith(resolvedDS, res.col("id").equalTo(resolved.col("resultId")), "left") + .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) + .agg(new IdCfHbMappingAggregator(entityClazz).toColumn()) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + public static class IdCfHbMappingAggregator extends Aggregator { + + private final Class entityClazz; + + public IdCfHbMappingAggregator(Class entityClazz) { + this.entityClazz = entityClazz; + } + + @Override + public T zero() { + try { + return entityClazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public T reduce(T r, IdCfHbMapping a) { + if (Objects.isNull(a) && StringUtils.isBlank(a.getMaster())) { + return r; + } + r.getCollectedfrom().forEach(kv -> updateKey(kv, a)); + r.getInstance().forEach(i -> { + updateKey(i.getHostedby(), a); + updateKey(i.getCollectedfrom(), a); + }); + return r; + } + + @Override + public T merge(T b1, T b2) { + if (Objects.isNull(b1.getId())) { + return b2; + } + return b1; + } + + @Override + public T finish(T r) { + return r; + } + + private void updateKey(final KeyValue kv, final IdCfHbMapping a) { + if (kv.getKey().equals(a.getCfhb())) { + kv.setKey(a.getMaster()); + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(entityClazz); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(entityClazz); + } + } + + private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) { + IdCfHbMapping m = new IdCfHbMapping(resultId); + m.setCfhb(cfHb); + return m; + } + + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java new file mode 100644 index 000000000..16d1a2613 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/IdCfHbMapping.java @@ -0,0 +1,44 @@ + +package eu.dnetlib.dhp.oa.graph.clean.cfhb; + +import java.io.Serializable; + +public class IdCfHbMapping implements Serializable { + + private String resultid; + + private String cfhb; + + private String master; + + public IdCfHbMapping() { + } + + public IdCfHbMapping(String id) { + this.resultid = id; + } + + public String getResultid() { + return resultid; + } + + public void setResultid(String resultid) { + this.resultid = resultid; + } + + public String getCfhb() { + return cfhb; + } + + public void setCfhb(String cfhb) { + this.cfhb = cfhb; + } + + public String getMaster() { + return master; + } + + public void setMaster(String master) { + this.master = master; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 6435d5131..e717fac0f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -317,13 +317,13 @@ - + yarn @@ -434,7 +434,6 @@ - yarn @@ -460,13 +459,13 @@ - + yarn @@ -583,7 +582,202 @@ - + + + + + ${wf:conf('shouldClean') eq true} + + + + + + + eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --hdfsPath${workingDir}/masterduplicate + --hdfsNameNode${nameNode} + + + + + + + + + + + + + + + yarn + cluster + patch publication cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/publication + --outputPath${workingPath}/cfHbPatched/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --workingDir${workingDir}/working/publication + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch dataset cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/dataset + --outputPath${workingPath}/cfHbPatched/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --workingDir${workingDir}/working/dataset + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch otherresearchproduct cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/otherresearchproduct + --outputPath${workingPath}/cfHbPatched/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --workingDir${workingDir}/working/otherresearchproduct + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + yarn + cluster + patch software cfhb + eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphOutputPath}/software + --outputPath${workingPath}/cfHbPatched/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --workingDir${workingDir}/working/software + --masterDuplicatePath${workingDir}/masterduplicate + + + + + + + + + + + + + + + + + + + + ${workingPath}/cfHbPatched/publication + ${graphOutputPath}/publication + + + + + + + + + + + ${workingPath}/cfHbPatched/dataset + ${graphOutputPath}/dataset + + + + + + + + + + + ${workingPath}/cfHbPatched/otherresearchproduct + ${graphOutputPath}/otherresearchproduct + + + + + + + + + + + ${workingPath}/cfHbPatched/software + ${graphOutputPath}/software + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json new file mode 100644 index 000000000..fbe2cca10 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "pu", + "paramLongName": "postgresUrl", + "paramDescription": "the jdbc url to the postgres", + "paramRequired": true + }, + { + "paramName": "uid", + "paramLongName": "postgresUser", + "paramDescription": "the postgres user", + "paramRequired": true + }, + { + "paramName": "pwd", + "paramLongName": "postgresPassword", + "paramDescription": "the postgres password=", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the target path on HDFS", + "paramRequired": true + }, + { + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the HDFS nameNode", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json new file mode 100644 index 000000000..8b8a5f70e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "class", + "paramLongName": "graphTableClassName", + "paramDescription": "class name moelling the graph table", + "paramRequired": true + }, + { + "paramName": "md", + "paramLongName": "datasourceMasterDuplicate", + "paramDescription": "path to the file on HDFS holding the datasource id tuples [master, duplicate]", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 3e35021c8..ad6ceef54 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -1002,7 +1002,8 @@ class MappersTest { @Test void testEOSCFuture_ROHub() throws IOException { - final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml"))); + final String xml = IOUtils + .toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml"))); final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); final OtherResearchProduct rocrate = (OtherResearchProduct) list.get(0); assertNotNull(rocrate.getEoscifguidelines()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java index 08bf19fe4..3e1a501d1 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java @@ -1,13 +1,14 @@ + package eu.dnetlib.dhp.oa.provision; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; -import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.IOException; +import java.io.StringReader; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; + import org.apache.commons.io.IOUtils; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrInputDocument; @@ -18,71 +19,73 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import java.io.IOException; -import java.io.StringReader; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; public class EOSCFuture_Test { - public static ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public static ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - public static final String VERSION = "2021-04-15T10:05:53Z"; - public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"; + public static final String VERSION = "2021-04-15T10:05:53Z"; + public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"; - private ContextMapper contextMapper; + private ContextMapper contextMapper; - @BeforeEach - public void setUp() { - contextMapper = new ContextMapper(); - } + @BeforeEach + public void setUp() { + contextMapper = new ContextMapper(); + } + @Test + public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException { - @Test - public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException { + final ContextMapper contextMapper = new ContextMapper(); - final ContextMapper contextMapper = new ContextMapper(); + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); - final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, - XmlConverterJob.schemaLocation); + final OtherResearchProduct p = OBJECT_MAPPER + .readValue( + IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), + OtherResearchProduct.class); - final OtherResearchProduct p = OBJECT_MAPPER - .readValue(IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), OtherResearchProduct.class); + final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + assertNotNull(xml); - assertNotNull(xml); + final Document doc = new SAXReader().read(new StringReader(xml)); - final Document doc = new SAXReader().read(new StringReader(xml)); + assertNotNull(doc); + System.out.println(doc.asXML()); - assertNotNull(doc); - System.out.println(doc.asXML()); + testRecordTransformation(xml); + } + private void testRecordTransformation(final String record) throws IOException, TransformerException { + final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml")); + final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl")); - testRecordTransformation(xml); - } + final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt); + final Transformer tr = SaxonTransformerFactory.newInstance(transformer); - private void testRecordTransformation(final String record) throws IOException, TransformerException { - final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml")); - final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl")); + final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record); - final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt); + final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID) + .parseDocument(indexRecordXML); - final Transformer tr = SaxonTransformerFactory.newInstance(transformer); + final String xmlDoc = ClientUtils.toXML(solrDoc); - final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record); - - final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID) - .parseDocument(indexRecordXML); - - final String xmlDoc = ClientUtils.toXML(solrDoc); - - Assertions.assertNotNull(xmlDoc); - System.out.println(xmlDoc); - } + Assertions.assertNotNull(xmlDoc); + System.out.println(xmlDoc); + } }