graph cleaning, suggestions from ticket 8898 - round 2 #356
|
@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -57,11 +58,14 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
|
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
final String inputPath = parser.get("inputPath");
|
final String crossrefInputPath = parser.get("crossrefInputPath");
|
||||||
log.info("inputPath {}: ", inputPath);
|
log.info("crossrefInputPath: {}", crossrefInputPath);
|
||||||
|
|
||||||
|
final String pubmedInputPath = parser.get("pubmedInputPath");
|
||||||
|
log.info("pubmedInputPath: {}", pubmedInputPath);
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath {}: ", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
@ -70,12 +74,28 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Constants.removeOutputDir(spark, outputPath);
|
Constants.removeOutputDir(spark, outputPath);
|
||||||
prepareAffiliationRelations(spark, inputPath, outputPath);
|
|
||||||
|
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||||
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
||||||
|
spark, crossrefInputPath, collectedFromCrossref);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromPubmed = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
||||||
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||||
|
spark, pubmedInputPath, collectedFromPubmed);
|
||||||
|
|
||||||
|
crossrefRelations
|
||||||
|
.union(pubmedRelations)
|
||||||
|
.saveAsHadoopFile(
|
||||||
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath,
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
||||||
String outputPath) {
|
String inputPath,
|
||||||
|
List<KeyValue> collectedfrom) {
|
||||||
|
|
||||||
// load and parse affiliation relations from HDFS
|
// load and parse affiliation relations from HDFS
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
|
@ -92,7 +112,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
new Column("matching.Confidence").as("confidence"));
|
new Column("matching.Confidence").as("confidence"));
|
||||||
|
|
||||||
// prepare action sets for affiliation relations
|
// prepare action sets for affiliation relations
|
||||||
df
|
return df
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
||||||
|
|
||||||
|
@ -120,8 +140,6 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
qualifier,
|
qualifier,
|
||||||
Double.toString(row.getAs("confidence")));
|
Double.toString(row.getAs("confidence")));
|
||||||
|
|
||||||
List<KeyValue> collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
|
||||||
|
|
||||||
// return bi-directional relations
|
// return bi-directional relations
|
||||||
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
||||||
|
|
||||||
|
@ -129,9 +147,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.map(p -> new AtomicAction(Relation.class, p))
|
.map(p -> new AtomicAction(Relation.class, p))
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
||||||
|
|
|
@ -6,9 +6,15 @@
|
||||||
"paramRequired": false
|
"paramRequired": false
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "ip",
|
"paramName": "cip",
|
||||||
"paramLongName": "inputPath",
|
"paramLongName": "crossrefInputPath",
|
||||||
"paramDescription": "the URL from where to get the programme file",
|
"paramDescription": "the path to get the input data from Crossref",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "pip",
|
||||||
|
"paramLongName": "pubmedInputPath",
|
||||||
|
"paramDescription": "the path to get the input data from Pubmed",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -31,5 +31,6 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
|
||||||
# The following is needed as a property of a workflow
|
# The following is needed as a property of a workflow
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||||
|
|
||||||
inputPath=/data/bip-affiliations/data.json
|
crossrefInputPath=/data/bip-affiliations/data.json
|
||||||
|
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
||||||
outputPath=/tmp/crossref-affiliations-output-v5
|
outputPath=/tmp/crossref-affiliations-output-v5
|
||||||
|
|
|
@ -2,8 +2,12 @@
|
||||||
<parameters>
|
<parameters>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>inputPath</name>
|
<name>crossrefInputPath</name>
|
||||||
<description>the path where to find the inferred affiliation relations</description>
|
<description>the path where to find the inferred affiliation relations from Crossref</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>pubmedInputPath</name>
|
||||||
|
<description>the path where to find the inferred affiliation relations from Pubmed</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
|
@ -83,7 +87,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name>
|
<name>Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed)</name>
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -96,7 +100,8 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
|
||||||
|
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -74,17 +74,22 @@ public class PrepareAffiliationRelationsTest {
|
||||||
@Test
|
@Test
|
||||||
void testMatch() throws Exception {
|
void testMatch() throws Exception {
|
||||||
|
|
||||||
String affiliationRelationsPath = getClass()
|
String crossrefAffiliationRelationPath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
|
String pubmedAffiliationRelationsPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
String outputPath = workingDir.toString() + "/actionSet";
|
String outputPath = workingDir.toString() + "/actionSet";
|
||||||
|
|
||||||
PrepareAffiliationRelations
|
PrepareAffiliationRelations
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"-inputPath", affiliationRelationsPath,
|
"-crossrefInputPath", crossrefAffiliationRelationPath,
|
||||||
|
"-pubmedInputPath", pubmedAffiliationRelationsPath,
|
||||||
"-outputPath", outputPath
|
"-outputPath", outputPath
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -101,7 +106,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
// count the number of relations
|
// count the number of relations
|
||||||
assertEquals(20, tmp.count());
|
assertEquals(40, tmp.count());
|
||||||
|
|
||||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||||
dataset.createOrReplaceTempView("result");
|
dataset.createOrReplaceTempView("result");
|
||||||
|
@ -112,7 +117,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// verify that we have equal number of bi-directional relations
|
// verify that we have equal number of bi-directional relations
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
10, execVerification
|
20, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -120,7 +125,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
10, execVerification
|
20, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
|
|
@ -29,7 +29,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
||||||
|
|
||||||
protected final VocabularyGroup vocs;
|
protected final VocabularyGroup vocs;
|
||||||
|
|
||||||
protected static final UrlValidator URL_VALIDATOR = UrlValidator.getInstance();
|
protected static final UrlValidator URL_VALIDATOR = new UrlValidator(UrlValidator.ALLOW_2_SLASHES);
|
||||||
|
|
||||||
private final boolean invisible;
|
private final boolean invisible;
|
||||||
|
|
||||||
|
|
|
@ -797,6 +797,20 @@ class MappersTest {
|
||||||
assertFalse(p_cleaned.getTitle().isEmpty());
|
assertFalse(p_cleaned.getTitle().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test_instance_url_validation() throws IOException {
|
||||||
|
final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("idus_sevilla.xml")));
|
||||||
|
final List<Oaf> list = new OafToOafMapper(vocs, false, true).processMdRecord(xml);
|
||||||
|
|
||||||
|
final Publication p = (Publication) list.get(0);
|
||||||
|
|
||||||
|
assertNotNull(p.getInstance());
|
||||||
|
assertFalse(p.getInstance().isEmpty());
|
||||||
|
assertNotNull(p.getInstance().get(0).getUrl());
|
||||||
|
assertFalse(p.getInstance().get(0).getUrl().isEmpty());
|
||||||
|
assertEquals("https://idus.us.es/handle//11441/118940", p.getInstance().get(0).getUrl().get(0));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testZenodo() throws IOException, DocumentException {
|
void testZenodo() throws IOException, DocumentException {
|
||||||
final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("odf_zenodo.xml")));
|
final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("odf_zenodo.xml")));
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<record xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||||
|
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
|
||||||
|
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||||
|
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
||||||
|
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||||
|
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance"
|
||||||
|
xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||||
|
<header xmlns="http://namespace.openaire.eu/">
|
||||||
|
<dri:objIdentifier>od______3272::6a4d00217a024a46ce9697ce98b13c2a</dri:objIdentifier>
|
||||||
|
<dri:recordIdentifier>oai:idus.us.es:11441/118940</dri:recordIdentifier>
|
||||||
|
<dri:dateOfCollection/>
|
||||||
|
<dri:mdFormat/>
|
||||||
|
<dri:mdFormatInterpretation/>
|
||||||
|
<dri:repositoryId/>
|
||||||
|
<dr:objectIdentifier/>
|
||||||
|
<dr:dateOfCollection>2021-08-20T12:32:32.826Z</dr:dateOfCollection>
|
||||||
|
<dr:dateOfTransformation>2023-07-04T15:47:55.397Z</dr:dateOfTransformation>
|
||||||
|
<oaf:datasourceprefix>od______3272</oaf:datasourceprefix>
|
||||||
|
</header>
|
||||||
|
<metadata xmlns="http://namespace.openaire.eu/">
|
||||||
|
<dc:title>El museo pictorico y escala optica : tomo I : theorica de la pintura en que se describe su origen ... y se aprueban con demonstraciomes mathematicas y filosoficas, sus mas radicales fundamentos</dc:title>
|
||||||
|
<dc:creator>Palomino de Castro y Velasco, Antonio, 1653-1726</dc:creator>
|
||||||
|
<dc:contributor>Rovira y Brocandel, Hipólito, 1693-1765</dc:contributor>
|
||||||
|
<dc:contributor>Palomino de Castro y Velasco, Antonio, 1653-1726</dc:contributor>
|
||||||
|
<dc:date>2021-08-12T08:59:53Z</dc:date>
|
||||||
|
<dc:date>1715</dc:date>
|
||||||
|
<dc:description>A 042(a)/063</dc:description>
|
||||||
|
<dc:format>application/pdf</dc:format>
|
||||||
|
<dc:identifier>https://idus.us.es/handle//11441/118940</dc:identifier>
|
||||||
|
<dc:language>spa</dc:language>
|
||||||
|
<dc:publisher>En Madrid : por Lucas Antonio de Bedmar ... : vendese en casa de Don Joseph de Villar y Villanueva, 1715</dc:publisher>
|
||||||
|
<dc:type>info:eu-repo/semantics/book</dc:type>
|
||||||
|
<dc:type>info:eu-repo/semantics/publishedVersion</dc:type>
|
||||||
|
<dr:CobjCategory type="publication">0002</dr:CobjCategory>
|
||||||
|
<oaf:dateAccepted>1715-01-01</oaf:dateAccepted>
|
||||||
|
<oaf:embargoenddate/>
|
||||||
|
<oaf:collectedDatasourceid>opendoar____::3272</oaf:collectedDatasourceid>
|
||||||
|
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||||
|
<oaf:hostedBy id="opendoar____::3272" name="idUS. Depósito de Investigación de la Universidad de Sevilla."/>
|
||||||
|
<oaf:collectedFrom id="opendoar____::3272" name="idUS. Depósito de Investigación de la Universidad de Sevilla."/>
|
||||||
|
<oaf:identifier identifierType="landingPage">https://idus.us.es/handle//11441/118940</oaf:identifier>
|
||||||
|
<oaf:journal eissn="" ep="" iss="" issn="" sp="" vol=""/>
|
||||||
|
<oaf:license>http://creativecommons.org/licenses/by-nc-nd/4.0/</oaf:license>
|
||||||
|
</metadata>
|
||||||
|
<about>
|
||||||
|
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||||
|
<originDescription altered="true" harvestDate="2021-08-20T12:32:32.826Z">
|
||||||
|
<baseURL>http%3A%2F%2Fidus.us.es%2Foai%2Fdriver</baseURL>
|
||||||
|
<identifier>oai:idus.us.es:11441/118940</identifier>
|
||||||
|
<datestamp>2021-08-12T08:59:54Z</datestamp>
|
||||||
|
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
|
||||||
|
</originDescription>
|
||||||
|
</provenance>
|
||||||
|
<oaf:datainfo>
|
||||||
|
<oaf:inferred>false</oaf:inferred>
|
||||||
|
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||||
|
<oaf:trust>0.9</oaf:trust>
|
||||||
|
<oaf:inferenceprovenance/>
|
||||||
|
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
|
||||||
|
classname="sysimport:crosswalk:repository"
|
||||||
|
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||||
|
</oaf:datainfo>
|
||||||
|
</about>
|
||||||
|
</record>
|
Loading…
Reference in New Issue