[graph cleaning] patch the result's collectedfrom and hostedby identifiers according to the datasource master-duplicate mapping

This commit is contained in:
Claudio Atzori 2022-11-28 09:53:23 +01:00
parent a79c47522d
commit 24ef301cc1
10 changed files with 719 additions and 56 deletions

View File

@ -0,0 +1,76 @@
package eu.dnetlib.dhp.common.action;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
public class ReadDatasourceMasterDuplicateFromDB {
private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services;";
public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode)
throws IOException {
int count = 0;
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath));
log.info("running query: {}", QUERY);
log.info("storing results in: {}", hdfsPath);
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer));
count++;
}
}
return count;
}
private static MasterDuplicate datasourceMasterMap(ResultSet rs) {
try {
MasterDuplicate md = new MasterDuplicate();
final String master = rs.getString("master");
final String duplicate = rs.getString("duplicate");
md.setMaster(OafMapperUtils.createOpenaireId(10, master, true));
md.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true));
return md;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) {
try {
writer.write(OBJECT_MAPPER.writeValueAsString(dm));
writer.newLine();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.common.action.model;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 21/07/22
*/
public class MasterDuplicate implements Serializable {
private String duplicate;
private String master;
public String getDuplicate() {
return duplicate;
}
public void setDuplicate(String duplicate) {
this.duplicate = duplicate;
}
public String getMaster() {
return master;
}
public void setMaster(String master) {
this.master = master;
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.oa.graph.clean;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
public class MasterDuplicateAction {
private static final Logger log = LoggerFactory.getLogger(MasterDuplicateAction.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MasterDuplicateAction.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json")));
parser.parseArgument(args);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl: {}", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser: {}", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: {}", dbPassword);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode: {}", hdfsNameNode);
int rows = ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode);
log.info("written {} rows", rows);
}
}

View File

@ -0,0 +1,207 @@
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
public class CleanCfHbSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanCountrySparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String masterDuplicatePath = parser.get("masterDuplicatePath");
log.info("masterDuplicatePath: {}", masterDuplicatePath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
cleanCfHb(
spark, inputPath, entityClazz, workingPath, masterDuplicatePath, outputPath);
});
}
private static <T extends Result> void cleanCfHb(SparkSession spark, String inputPath, Class<T> entityClazz,
String workingPath, String masterDuplicatePath, String outputPath) {
// read the master-duplicate tuples
Dataset<MasterDuplicate> md = spark
.read()
.textFile(masterDuplicatePath)
.map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class));
// read the result table
Dataset<T> res = spark
.read()
.textFile(inputPath)
.map(as(entityClazz), Encoders.bean(entityClazz));
// prepare the resolved CF|HB references with the corresponding EMPTY master ID
Dataset<IdCfHbMapping> resolved = res
.flatMap(
(FlatMapFunction<T, IdCfHbMapping>) r -> Stream
.concat(
r.getCollectedfrom().stream().map(KeyValue::getKey),
Stream
.concat(
r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey),
r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey)))
.distinct()
.map(s -> asIdCfHbMapping(r.getId(), s))
.iterator(),
Encoders.bean(IdCfHbMapping.class));
final String resolvedPath = workingPath + "/cfHbResolved";
// set the EMPTY master ID and save it aside
resolved
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicate")))
.map((MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping>) t -> {
t._1().setMaster(t._2().getMaster());
return t._1();
}, Encoders.bean(IdCfHbMapping.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(resolvedPath);
// read again the resolved CF|HB mapping
Dataset<IdCfHbMapping> resolvedDS = spark
.read()
.load(resolvedPath)
.as(Encoders.bean(IdCfHbMapping.class));
// Join the results with the resolved CF|HB mapping, apply the mapping and save it
res
.joinWith(resolvedDS, res.col("id").equalTo(resolved.col("resultId")), "left")
.groupByKey((MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> t._1().getId(), Encoders.STRING())
.agg(new IdCfHbMappingAggregator(entityClazz).toColumn())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
public static class IdCfHbMappingAggregator<T extends Result> extends Aggregator<IdCfHbMapping, T, T> {
private final Class<T> entityClazz;
public IdCfHbMappingAggregator(Class<T> entityClazz) {
this.entityClazz = entityClazz;
}
@Override
public T zero() {
try {
return entityClazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public T reduce(T r, IdCfHbMapping a) {
if (Objects.isNull(a) && StringUtils.isBlank(a.getMaster())) {
return r;
}
r.getCollectedfrom().forEach(kv -> updateKey(kv, a));
r.getInstance().forEach(i -> {
updateKey(i.getHostedby(), a);
updateKey(i.getCollectedfrom(), a);
});
return r;
}
@Override
public T merge(T b1, T b2) {
if (Objects.isNull(b1.getId())) {
return b2;
}
return b1;
}
@Override
public T finish(T r) {
return r;
}
private void updateKey(final KeyValue kv, final IdCfHbMapping a) {
if (kv.getKey().equals(a.getCfhb())) {
kv.setKey(a.getMaster());
}
}
@Override
public Encoder<T> bufferEncoder() {
return Encoders.bean(entityClazz);
}
@Override
public Encoder<T> outputEncoder() {
return Encoders.bean(entityClazz);
}
}
private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) {
IdCfHbMapping m = new IdCfHbMapping(resultId);
m.setCfhb(cfHb);
return m;
}
private static <R> MapFunction<String, R> as(Class<R> clazz) {
return s -> OBJECT_MAPPER.readValue(s, clazz);
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
import java.io.Serializable;
public class IdCfHbMapping implements Serializable {
private String resultid;
private String cfhb;
private String master;
public IdCfHbMapping() {
}
public IdCfHbMapping(String id) {
this.resultid = id;
}
public String getResultid() {
return resultid;
}
public void setResultid(String resultid) {
this.resultid = resultid;
}
public String getCfhb() {
return cfhb;
}
public void setCfhb(String cfhb) {
this.cfhb = cfhb;
}
public String getMaster() {
return master;
}
public void setMaster(String master) {
this.master = master;
}
}

View File

@ -317,13 +317,13 @@
</switch> </switch>
</decision> </decision>
<fork name="fork_clean_context"> <fork name="fork_clean_context">
<path start="clean_publication_context"/> <path start="clean_publication_context"/>
<path start="clean_dataset_context"/> <path start="clean_dataset_context"/>
<path start="clean_otherresearchproduct_context"/> <path start="clean_otherresearchproduct_context"/>
<path start="clean_software_context"/> <path start="clean_software_context"/>
</fork> </fork>
<action name="clean_publication_context"> <action name="clean_publication_context">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -434,7 +434,6 @@
<join name="wait_clean_context" to="select_datasourceId_from_country"/> <join name="wait_clean_context" to="select_datasourceId_from_country"/>
<action name="select_datasourceId_from_country"> <action name="select_datasourceId_from_country">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -460,13 +459,13 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_clean_country"> <fork name="fork_clean_country">
<path start="clean_publication_country"/> <path start="clean_publication_country"/>
<path start="clean_dataset_country"/> <path start="clean_dataset_country"/>
<path start="clean_otherresearchproduct_country"/> <path start="clean_otherresearchproduct_country"/>
<path start="clean_software_country"/> <path start="clean_software_country"/>
</fork> </fork>
<action name="clean_publication_country"> <action name="clean_publication_country">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -583,7 +582,202 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_clean_country" to="End"/> <join name="wait_clean_country" to="should_patch_datasource_ids"/>
<decision name="should_patch_datasource_ids">
<switch>
<case to="get_ds_master_duplicate">${wf:conf('shouldClean') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="get_ds_master_duplicate">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction</main-class>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="fork_patch_cfhb"/>
<error to="Kill"/>
</action>
<fork name="fork_patch_cfhb">
<path start="patch_publication_cfhb"/>
<path start="patch_dataset_cfhb"/>
<path start="patch_otherresearchproduct_cfhb"/>
<path start="patch_software_cfhb"/>
</fork>
<action name="patch_publication_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch publication cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/publication</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<action name="patch_dataset_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch dataset cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/dataset</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<action name="patch_otherresearchproduct_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch otherresearchproduct cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/otherresearchproduct</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<action name="patch_software_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch software cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/software</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<join name="wait_clean_cfhb" to="fork_copy_cfhb_patched_results"/>
<fork name="fork_copy_cfhb_patched_results">
<path start="copy_cfhb_patched_publication"/>
<path start="copy_cfhb_patched_dataset"/>
<path start="copy_cfhb_patched_otherresearchproduct"/>
<path start="copy_cfhb_patched_software"/>
</fork>
<action name="copy_cfhb_patched_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/publication"/>
</prepare>
<arg>${workingPath}/cfHbPatched/publication</arg>
<arg>${graphOutputPath}/publication</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_cfhb_patched_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/dataset"/>
</prepare>
<arg>${workingPath}/cfHbPatched/dataset</arg>
<arg>${graphOutputPath}/dataset</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_cfhb_patched_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/otherresearchproduct"/>
</prepare>
<arg>${workingPath}/cfHbPatched/otherresearchproduct</arg>
<arg>${graphOutputPath}/otherresearchproduct</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_cfhb_patched_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/software"/>
</prepare>
<arg>${workingPath}/cfHbPatched/software</arg>
<arg>${graphOutputPath}/software</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "pu",
"paramLongName": "postgresUrl",
"paramDescription": "the jdbc url to the postgres",
"paramRequired": true
},
{
"paramName": "uid",
"paramLongName": "postgresUser",
"paramDescription": "the postgres user",
"paramRequired": true
},
{
"paramName": "pwd",
"paramLongName": "postgresPassword",
"paramDescription": "the postgres password=",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "hdfsPath",
"paramDescription": "the target path on HDFS",
"paramRequired": true
},
{
"paramName": "nn",
"paramLongName": "hdfsNameNode",
"paramDescription": "the HDFS nameNode",
"paramRequired": true
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
},
{
"paramName": "md",
"paramLongName": "datasourceMasterDuplicate",
"paramDescription": "path to the file on HDFS holding the datasource id tuples [master, duplicate]",
"paramRequired": true
}
]

View File

@ -1002,7 +1002,8 @@ class MappersTest {
@Test @Test
void testEOSCFuture_ROHub() throws IOException { void testEOSCFuture_ROHub() throws IOException {
final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml"))); final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final OtherResearchProduct rocrate = (OtherResearchProduct) list.get(0); final OtherResearchProduct rocrate = (OtherResearchProduct) list.get(0);
assertNotNull(rocrate.getEoscifguidelines()); assertNotNull(rocrate.getEoscifguidelines());

View File

@ -1,13 +1,14 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.DeserializationFeature; import static org.junit.jupiter.api.Assertions.assertNotNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import java.io.IOException;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import java.io.StringReader;
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import javax.xml.transform.Transformer;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import javax.xml.transform.TransformerException;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -18,71 +19,73 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.xml.transform.Transformer; import com.fasterxml.jackson.databind.DeserializationFeature;
import javax.xml.transform.TransformerException; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringReader;
import static org.junit.jupiter.api.Assertions.assertNotNull; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
public class EOSCFuture_Test { public class EOSCFuture_Test {
public static ObjectMapper OBJECT_MAPPER = new ObjectMapper() public static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static final String VERSION = "2021-04-15T10:05:53Z"; public static final String VERSION = "2021-04-15T10:05:53Z";
public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"; public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl";
private ContextMapper contextMapper; private ContextMapper contextMapper;
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
contextMapper = new ContextMapper(); contextMapper = new ContextMapper();
} }
@Test
public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException {
@Test final ContextMapper contextMapper = new ContextMapper();
public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException {
final ContextMapper contextMapper = new ContextMapper(); final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final OtherResearchProduct p = OBJECT_MAPPER
XmlConverterJob.schemaLocation); .readValue(
IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")),
OtherResearchProduct.class);
final OtherResearchProduct p = OBJECT_MAPPER final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
.readValue(IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); assertNotNull(xml);
assertNotNull(xml); final Document doc = new SAXReader().read(new StringReader(xml));
final Document doc = new SAXReader().read(new StringReader(xml)); assertNotNull(doc);
System.out.println(doc.asXML());
assertNotNull(doc); testRecordTransformation(xml);
System.out.println(doc.asXML()); }
private void testRecordTransformation(final String record) throws IOException, TransformerException {
final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml"));
final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));
testRecordTransformation(xml); final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt);
}
final Transformer tr = SaxonTransformerFactory.newInstance(transformer);
private void testRecordTransformation(final String record) throws IOException, TransformerException { final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record);
final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml"));
final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));
final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt); final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID)
.parseDocument(indexRecordXML);
final Transformer tr = SaxonTransformerFactory.newInstance(transformer); final String xmlDoc = ClientUtils.toXML(solrDoc);
final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record); Assertions.assertNotNull(xmlDoc);
System.out.println(xmlDoc);
final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID) }
.parseDocument(indexRecordXML);
final String xmlDoc = ClientUtils.toXML(solrDoc);
Assertions.assertNotNull(xmlDoc);
System.out.println(xmlDoc);
}
} }