forked from antonis.lempesis/dnet-hadoop
Merge branch 'beta' of https://code-repo.d4science.org/D-Net/dnet-hadoop into beta
This commit is contained in:
commit
e1797c0a42
|
@ -64,26 +64,24 @@ abstract class AbstractRestClient extends Iterator[String]{
|
|||
.setSocketTimeout(timeout * 1000).build()
|
||||
val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build()
|
||||
var tries = 4
|
||||
try {
|
||||
while (tries > 0) {
|
||||
while (tries > 0) {
|
||||
println(s"requesting ${r.getURI}")
|
||||
val response = client.execute(r)
|
||||
println(s"get response with status${response.getStatusLine.getStatusCode}")
|
||||
if (response.getStatusLine.getStatusCode > 400) {
|
||||
tries -= 1
|
||||
try {
|
||||
val response = client.execute(r)
|
||||
println(s"get response with status${response.getStatusLine.getStatusCode}")
|
||||
if (response.getStatusLine.getStatusCode > 400) {
|
||||
tries -= 1
|
||||
}
|
||||
else
|
||||
return IOUtils.toString(response.getEntity.getContent)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
println(s"Error on requesting ${r.getURI}")
|
||||
e.printStackTrace()
|
||||
tries-=1
|
||||
}
|
||||
else
|
||||
return IOUtils.toString(response.getEntity.getContent)
|
||||
}
|
||||
""
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
throw new RuntimeException("Error on executing request ", e)
|
||||
} finally try client.close()
|
||||
catch {
|
||||
case e: IOException =>
|
||||
throw new RuntimeException("Unable to close client ", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
getBufferData()
|
||||
}
|
|
@ -561,6 +561,31 @@ public class MappersTest {
|
|||
assertNotNull(d.getInstance().get(0).getUrl());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEnermaps() throws IOException {
|
||||
final String xml = IOUtils.toString(getClass().getResourceAsStream("enermaps.xml"));
|
||||
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
|
||||
|
||||
System.out.println("***************");
|
||||
System.out.println(new ObjectMapper().writeValueAsString(list));
|
||||
System.out.println("***************");
|
||||
|
||||
assertEquals(1, list.size());
|
||||
assertTrue(list.get(0) instanceof Dataset);
|
||||
|
||||
final Dataset d = (Dataset) list.get(0);
|
||||
|
||||
assertValidId(d.getId());
|
||||
assertValidId(d.getCollectedfrom().get(0).getKey());
|
||||
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
|
||||
assertEquals(1, d.getAuthor().size());
|
||||
assertEquals(1, d.getInstance().size());
|
||||
assertNotNull(d.getInstance().get(0).getUrl());
|
||||
assertNotNull(d.getContext());
|
||||
assertTrue(StringUtils.isNotBlank(d.getContext().get(0).getId()));
|
||||
assertEquals("enermaps::selection::tgs00004", d.getContext().get(0).getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testClaimFromCrossref() throws IOException {
|
||||
final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_crossref.xml"));
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<record xmlns="http://datacite.org/schema/kernel-4"
|
||||
xmlns:dr="http://www.driver-repository.eu/namespace/dr" xmlns:oaf="http://namespace.openaire.eu/oaf">
|
||||
<oai:header xmlns="http://namespace.openaire.eu/"
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<dri:objIdentifier>enermaps____::04149ee428d07360314c2cb3ba95d41e</dri:objIdentifier>
|
||||
<dri:recordIdentifier>tgs00004</dri:recordIdentifier>
|
||||
<dri:dateOfCollection>2021-07-20T18:43:12.096+02:00</dri:dateOfCollection>
|
||||
<oaf:datasourceprefix>enermaps____</oaf:datasourceprefix>
|
||||
</oai:header>
|
||||
<metadata>
|
||||
<resource>
|
||||
<identifier identifierType="URL">https://ec.europa.eu/eurostat/web/products-datasets/-/tgs00004</identifier>
|
||||
<creators>
|
||||
<creator>
|
||||
<creatorName>Statistical Office of the European Union (Eurostat)</creatorName>
|
||||
</creator>
|
||||
</creators>
|
||||
<titles>
|
||||
<title>
|
||||
Regional GDP
|
||||
</title>
|
||||
</titles>
|
||||
<publisher>Statistical Office of the European Union (Eurostat)</publisher>
|
||||
<publicationYear>2020</publicationYear>
|
||||
<dates>
|
||||
<date dateType="Issued">2020-10-07</date>
|
||||
</dates>
|
||||
<resourceType resourceTypeGeneral="Dataset"/>
|
||||
<rightsList>
|
||||
<rights rightsURI="info:eu-repo/semantics/openAccess">OPEN</rights>
|
||||
<rights rightsURI="https://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International</rights>
|
||||
</rightsList>
|
||||
<descriptions>
|
||||
<description descriptionType="Abstract" xml:lang="EN">GDP expressed in PPS (purchasing power standards) eliminates differences in price levels between countries. Calculations on a per inhabitant basis allow for the comparison of economies and regions significantly different in absolute size. GDP per inhabitant in PPS is the key variable for determining the eligibility of NUTS 2 regions in the framework of the European Unions structural policy.</description>
|
||||
</descriptions>
|
||||
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
|
||||
<oaf:dateAccepted>2020-10-07</oaf:dateAccepted>
|
||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||
<oaf:license>Creative Commons Attribution 4.0 International</oaf:license>
|
||||
<oaf:hostedBy
|
||||
id="openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18" name="Unknown Repository"/>
|
||||
<oaf:collectedFrom id="enermaps____::db" name="Enermaps"/>
|
||||
<oaf:concept id="enermaps::selection::tgs00004"/>
|
||||
</resource>
|
||||
</metadata>
|
||||
<about xmlns="" xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||
<originDescription altered="true" harvestDate="2021-07-20T18:43:12.096+02:00">
|
||||
<baseURL>https%3A%2F%2Flab.idiap.ch%2Fenermaps%2Fapi%2Fdatacite</baseURL>
|
||||
<identifier/>
|
||||
<datestamp/>
|
||||
<metadataNamespace/>
|
||||
</originDescription>
|
||||
</provenance>
|
||||
<oaf:datainfo>
|
||||
<oaf:inferred>false</oaf:inferred>
|
||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||
<oaf:trust>0.9</oaf:trust>
|
||||
<oaf:inferenceprovenance/>
|
||||
<oaf:provenanceaction classid="sysimport:crosswalk"
|
||||
classname="sysimport:crosswalk"
|
||||
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||
</oaf:datainfo>
|
||||
</about>
|
||||
</record>
|
|
@ -9,6 +9,41 @@
|
|||
|
||||
<artifactId>dhp-graph-provision</artifactId>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>4.0.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>scala-compile-first</id>
|
||||
<phase>initialize</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>scala-test-compile</id>
|
||||
<phase>process-test-resources</phase>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<args>
|
||||
<arg>-Xmax-classfile-name</arg>
|
||||
<arg>200</arg>
|
||||
</args>
|
||||
<scalaVersion>${scala.version}</scalaVersion>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
|
|
|
@ -43,7 +43,7 @@ object SparkCreateActionset {
|
|||
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
|
||||
|
||||
relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.flatMap(r => List(r.getSource,r.getTarget)).distinct().write.save(s"$workingDirFolder/id_relation")
|
||||
.flatMap(r => List(r.getSource,r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
|
||||
|
||||
|
||||
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
|
||||
|
@ -56,35 +56,18 @@ object SparkCreateActionset {
|
|||
relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving publication")
|
||||
log.info("saving entities")
|
||||
|
||||
val publication:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/publication").as[Result].map(p => (p.getId, p))
|
||||
val entities:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
|
||||
|
||||
publication
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
|
||||
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
|
||||
entities
|
||||
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving dataset")
|
||||
val dataset:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/dataset").as[Result].map(p => (p.getId, p))
|
||||
dataset
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving software")
|
||||
val software:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/software").as[Result].map(p => (p.getId, p))
|
||||
software
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving Other Research product")
|
||||
val orp:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/otherresearchproduct").as[Result].map(p => (p.getId, p))
|
||||
orp
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ExportDataset"/>
|
||||
<start to="createActionSet"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
|
@ -26,7 +26,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Create Action Set</name>
|
||||
<class>eu.dnetlib.dhp.sx.provision.SparkCreateActionset</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
|
@ -42,7 +42,7 @@
|
|||
<arg>--workingDirFolder</arg><arg>${workingDirFolder}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<ok to="SaveActionSet"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
@ -53,7 +53,7 @@
|
|||
<mode>cluster</mode>
|
||||
<name>Save Action Set</name>
|
||||
<class>eu.dnetlib.dhp.sx.provision.SparkSaveActionSet</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
|
|
|
@ -21,8 +21,10 @@ import com.google.common.collect.Lists;
|
|||
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
|
||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
@ -131,4 +133,32 @@ public class XmlRecordFactoryTest {
|
|||
System.out.println(doc.asXML());
|
||||
assertEquals("", doc.valueOf("//rel/validated"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnermapsRecord() throws IOException, DocumentException {
|
||||
|
||||
String contextmap = "<entries><entry id=\"enermaps\" label=\"Energy Research\" name=\"context\" type=\"community\"/>"
|
||||
+
|
||||
"<entry id=\"enermaps::selection\" label=\"Featured dataset\" name=\"category\"/>" +
|
||||
"<entry id=\"enermaps::selection::tgs00004\" label=\"Dataset title\" name=\"concept\"/>" +
|
||||
"</entries>";
|
||||
|
||||
ContextMapper contextMapper = ContextMapper.fromXml(contextmap);
|
||||
XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, XmlConverterJob.schemaLocation,
|
||||
otherDsTypeId);
|
||||
|
||||
Dataset d = OBJECT_MAPPER
|
||||
.readValue(IOUtils.toString(getClass().getResourceAsStream("enermaps.json")), Dataset.class);
|
||||
|
||||
JoinedEntity je = new JoinedEntity<>(d);
|
||||
|
||||
String xml = xmlRecordFactory.build(je);
|
||||
|
||||
assertNotNull(xml);
|
||||
|
||||
Document doc = new SAXReader().read(new StringReader(xml));
|
||||
assertNotNull(doc);
|
||||
System.out.println(doc.asXML());
|
||||
assertEquals("enermaps::selection::tgs00004", doc.valueOf("//concept/@id"));
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue