forked from D-Net/dnet-hadoop
Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop
This commit is contained in:
commit
5f0906be60
|
@ -27,7 +27,8 @@ object SparkCreateBaselineDataFrame {
|
||||||
def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = {
|
def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = {
|
||||||
val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
|
val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
|
||||||
|
|
||||||
val result = data.linesWithSeparators.map(l => l.stripLineEnd)
|
val result = data.linesWithSeparators
|
||||||
|
.map(l => l.stripLineEnd)
|
||||||
.filter(l => l.startsWith("<a href="))
|
.filter(l => l.startsWith("<a href="))
|
||||||
.map { l =>
|
.map { l =>
|
||||||
val end = l.lastIndexOf("\">")
|
val end = l.lastIndexOf("\">")
|
||||||
|
|
|
@ -130,7 +130,7 @@
|
||||||
</xsl:if>
|
</xsl:if>
|
||||||
|
|
||||||
<oaf:hostedBy name="{$varOfficialName}" id="{$varDataSourceId}" />
|
<oaf:hostedBy name="{$varOfficialName}" id="{$varDataSourceId}" />
|
||||||
<oaf:collectedFrom name="{$varOfficialName}" id="{$varDataSourceId}ß" />
|
<oaf:collectedFrom name="{$varOfficialName}" id="{$varDataSourceId}" />
|
||||||
|
|
||||||
<xsl:variable name="varKnownFileEndings" select="('.bmp', '.doc', '.docx', '.epub', '.flv', '.jpeg', '.jpg', '.m4v', '.mp4', '.mpg', '.odp', '.pdf', '.png', '.ppt', '.tiv', '.txt', '.xls', '.xlsx', '.zip')" />
|
<xsl:variable name="varKnownFileEndings" select="('.bmp', '.doc', '.docx', '.epub', '.flv', '.jpeg', '.jpg', '.m4v', '.mp4', '.mpg', '.odp', '.pdf', '.png', '.ppt', '.tiv', '.txt', '.xls', '.xlsx', '.zip')" />
|
||||||
<xsl:variable name="varIdDoi" select="distinct-values((//dc:identifier[starts-with(., '10.')][matches(., '(10[.][0-9]{4,}[^\s/>]*/[^\s>]+)')], //dc:identifier[starts-with(., 'http') and (contains(., '://dx.doi.org/10.') or contains(., '://doi.org/10.'))]/substring-after(., 'doi.org/'), //dc:identifier[starts-with(lower-case(.), 'doi:10.')]/substring-after(lower-case(.), 'doi:')))" />
|
<xsl:variable name="varIdDoi" select="distinct-values((//dc:identifier[starts-with(., '10.')][matches(., '(10[.][0-9]{4,}[^\s/>]*/[^\s>]+)')], //dc:identifier[starts-with(., 'http') and (contains(., '://dx.doi.org/10.') or contains(., '://doi.org/10.'))]/substring-after(., 'doi.org/'), //dc:identifier[starts-with(lower-case(.), 'doi:10.')]/substring-after(lower-case(.), 'doi:')))" />
|
||||||
|
|
|
@ -63,7 +63,9 @@ class BioScholixTest extends AbstractVocabularyTest {
|
||||||
val records: String = Source
|
val records: String = Source
|
||||||
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump"))
|
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump"))
|
||||||
.mkString
|
.mkString
|
||||||
val r: List[Oaf] = records.linesWithSeparators.map(l => l.stripLineEnd).toList
|
val r: List[Oaf] = records.linesWithSeparators
|
||||||
|
.map(l => l.stripLineEnd)
|
||||||
|
.toList
|
||||||
.map(s => mapper.readValue(s, classOf[PMArticle]))
|
.map(s => mapper.readValue(s, classOf[PMArticle]))
|
||||||
.map(a => PubMedToOaf.convert(a, vocabularies))
|
.map(a => PubMedToOaf.convert(a, vocabularies))
|
||||||
assertEquals(10, r.size)
|
assertEquals(10, r.size)
|
||||||
|
@ -175,7 +177,8 @@ class BioScholixTest extends AbstractVocabularyTest {
|
||||||
.mkString
|
.mkString
|
||||||
records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty))
|
records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty))
|
||||||
|
|
||||||
val result: List[Oaf] = records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.pdbTOOaf(o))
|
val result: List[Oaf] =
|
||||||
|
records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.pdbTOOaf(o))
|
||||||
|
|
||||||
assertTrue(result.nonEmpty)
|
assertTrue(result.nonEmpty)
|
||||||
result.foreach(r => assertNotNull(r))
|
result.foreach(r => assertNotNull(r))
|
||||||
|
@ -196,7 +199,8 @@ class BioScholixTest extends AbstractVocabularyTest {
|
||||||
.mkString
|
.mkString
|
||||||
records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty))
|
records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty))
|
||||||
|
|
||||||
val result: List[Oaf] = records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.uniprotToOAF(o))
|
val result: List[Oaf] =
|
||||||
|
records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.uniprotToOAF(o))
|
||||||
|
|
||||||
assertTrue(result.nonEmpty)
|
assertTrue(result.nonEmpty)
|
||||||
result.foreach(r => assertNotNull(r))
|
result.foreach(r => assertNotNull(r))
|
||||||
|
@ -241,7 +245,8 @@ class BioScholixTest extends AbstractVocabularyTest {
|
||||||
.mkString
|
.mkString
|
||||||
records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty))
|
records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty))
|
||||||
|
|
||||||
val result: List[Oaf] = records.linesWithSeparators.map(l => l.stripLineEnd).map(s => BioDBToOAF.crossrefLinksToOaf(s)).toList
|
val result: List[Oaf] =
|
||||||
|
records.linesWithSeparators.map(l => l.stripLineEnd).map(s => BioDBToOAF.crossrefLinksToOaf(s)).toList
|
||||||
|
|
||||||
assertNotNull(result)
|
assertNotNull(result)
|
||||||
assertTrue(result.nonEmpty)
|
assertTrue(result.nonEmpty)
|
||||||
|
@ -280,10 +285,13 @@ class BioScholixTest extends AbstractVocabularyTest {
|
||||||
|
|
||||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
|
||||||
val l: List[ScholixResolved] = records.linesWithSeparators.map(l => l.stripLineEnd).map { input =>
|
val l: List[ScholixResolved] = records.linesWithSeparators
|
||||||
lazy val json = parse(input)
|
.map(l => l.stripLineEnd)
|
||||||
json.extract[ScholixResolved]
|
.map { input =>
|
||||||
}.toList
|
lazy val json = parse(input)
|
||||||
|
json.extract[ScholixResolved]
|
||||||
|
}
|
||||||
|
.toList
|
||||||
|
|
||||||
val result: List[Oaf] = l.map(s => BioDBToOAF.scholixResolvedToOAF(s))
|
val result: List[Oaf] = l.map(s => BioDBToOAF.scholixResolvedToOAF(s))
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,10 @@
|
||||||
package eu.dnetlib.dhp.broker.oa.util;
|
package eu.dnetlib.dhp.broker.oa.util;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -80,7 +83,7 @@ public class ConversionUtils {
|
||||||
res.setOpenaireId(cleanOpenaireId(d.getId()));
|
res.setOpenaireId(cleanOpenaireId(d.getId()));
|
||||||
res.setOriginalId(first(d.getOriginalId()));
|
res.setOriginalId(first(d.getOriginalId()));
|
||||||
res.setTitle(structPropValue(d.getTitle()));
|
res.setTitle(structPropValue(d.getTitle()));
|
||||||
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
|
res.setPids(allResultPids(d));
|
||||||
res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue));
|
res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue));
|
||||||
return res;
|
return res;
|
||||||
|
@ -95,7 +98,7 @@ public class ConversionUtils {
|
||||||
res.setOpenaireId(cleanOpenaireId(p.getId()));
|
res.setOpenaireId(cleanOpenaireId(p.getId()));
|
||||||
res.setOriginalId(first(p.getOriginalId()));
|
res.setOriginalId(first(p.getOriginalId()));
|
||||||
res.setTitle(structPropValue(p.getTitle()));
|
res.setTitle(structPropValue(p.getTitle()));
|
||||||
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
|
res.setPids(allResultPids(p));
|
||||||
res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue));
|
res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue));
|
||||||
|
|
||||||
|
@ -124,7 +127,7 @@ public class ConversionUtils {
|
||||||
res
|
res
|
||||||
.setJournal(
|
.setJournal(
|
||||||
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
|
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
|
||||||
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
|
res.setPids(allResultPids(result));
|
||||||
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
|
||||||
res
|
res
|
||||||
.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef));
|
.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef));
|
||||||
|
@ -132,6 +135,26 @@ public class ConversionUtils {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static List<OaBrokerTypedValue> allResultPids(final Result result) {
|
||||||
|
final Map<String, StructuredProperty> map = new HashMap<>();
|
||||||
|
|
||||||
|
if (result.getPid() != null) {
|
||||||
|
result.getPid().forEach(sp -> map.put(sp.getValue(), sp));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.getInstance() != null) {
|
||||||
|
result.getInstance().forEach(i -> {
|
||||||
|
if (i.getPid() != null) {
|
||||||
|
i.getPid().forEach(sp -> map.put(sp.getValue(), sp));
|
||||||
|
}
|
||||||
|
if (i.getAlternateIdentifier() != null) {
|
||||||
|
i.getAlternateIdentifier().forEach(sp -> map.put(sp.getValue(), sp));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return mappedList(map.values(), ConversionUtils::oafPidToBrokerPid);
|
||||||
|
}
|
||||||
|
|
||||||
public static String cleanOpenaireId(final String id) {
|
public static String cleanOpenaireId(final String id) {
|
||||||
return id.contains("|") ? StringUtils.substringAfter(id, "|") : id;
|
return id.contains("|") ? StringUtils.substringAfter(id, "|") : id;
|
||||||
}
|
}
|
||||||
|
@ -304,7 +327,7 @@ public class ConversionUtils {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) {
|
private static <F, T> List<T> mappedList(final Collection<F> list, final Function<F, T> func) {
|
||||||
if (list == null) {
|
if (list == null) {
|
||||||
return new ArrayList<>();
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa.util;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
|
||||||
|
class ConversionUtilsTest {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAllResultPids() {
|
||||||
|
final Qualifier qf = new Qualifier();
|
||||||
|
qf.setClassid("test");
|
||||||
|
qf.setClassname("test");
|
||||||
|
qf.setSchemeid("test");
|
||||||
|
qf.setSchemename("test");
|
||||||
|
|
||||||
|
final StructuredProperty sp1 = new StructuredProperty();
|
||||||
|
sp1.setValue("1");
|
||||||
|
sp1.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp2 = new StructuredProperty();
|
||||||
|
sp2.setValue("2");
|
||||||
|
sp2.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp3 = new StructuredProperty();
|
||||||
|
sp3.setValue("3");
|
||||||
|
sp3.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp4a = new StructuredProperty();
|
||||||
|
sp4a.setValue("4");
|
||||||
|
sp4a.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp4b = new StructuredProperty();
|
||||||
|
sp4b.setValue("4");
|
||||||
|
sp4b.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp5 = new StructuredProperty();
|
||||||
|
sp5.setValue("5");
|
||||||
|
sp5.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp6a = new StructuredProperty();
|
||||||
|
sp6a.setValue("6");
|
||||||
|
sp6a.setQualifier(qf);
|
||||||
|
|
||||||
|
final StructuredProperty sp6b = new StructuredProperty();
|
||||||
|
sp6b.setValue("6");
|
||||||
|
sp6b.setQualifier(qf);
|
||||||
|
|
||||||
|
final Result oaf = new Result();
|
||||||
|
oaf.setPid(new ArrayList<>());
|
||||||
|
oaf.getPid().add(sp1);
|
||||||
|
oaf.getPid().add(sp2);
|
||||||
|
oaf.getPid().add(sp4a);
|
||||||
|
|
||||||
|
final Instance instance1 = new Instance();
|
||||||
|
instance1.setPid(new ArrayList<>());
|
||||||
|
instance1.setAlternateIdentifier(new ArrayList<>());
|
||||||
|
instance1.getPid().add(sp3);
|
||||||
|
instance1.getPid().add(sp4b);
|
||||||
|
instance1.getAlternateIdentifier().add(sp5);
|
||||||
|
instance1.getAlternateIdentifier().add(sp6a);
|
||||||
|
|
||||||
|
final Instance instance2 = new Instance();
|
||||||
|
instance2.setPid(new ArrayList<>());
|
||||||
|
instance2.setAlternateIdentifier(new ArrayList<>());
|
||||||
|
instance2.getPid().add(sp6b);
|
||||||
|
|
||||||
|
oaf.setInstance(new ArrayList<>());
|
||||||
|
oaf.getInstance().add(instance1);
|
||||||
|
oaf.getInstance().add(instance2);
|
||||||
|
|
||||||
|
final List<OaBrokerTypedValue> list = ConversionUtils.allResultPids(oaf);
|
||||||
|
|
||||||
|
// list.forEach(x -> System.out.println(x.getValue()));
|
||||||
|
|
||||||
|
assertEquals(6, list.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -25,9 +25,11 @@ class MappingORCIDToOAFTest {
|
||||||
.mkString
|
.mkString
|
||||||
assertNotNull(json)
|
assertNotNull(json)
|
||||||
assertFalse(json.isEmpty)
|
assertFalse(json.isEmpty)
|
||||||
json.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => {
|
json.linesWithSeparators
|
||||||
assertNotNull(ORCIDToOAF.extractValueFromInputString(s))
|
.map(l => l.stripLineEnd)
|
||||||
})
|
.foreach(s => {
|
||||||
|
assertNotNull(ORCIDToOAF.extractValueFromInputString(s))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -14,6 +14,7 @@ import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -84,19 +85,26 @@ public class SparkCountryPropagationJob {
|
||||||
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
|
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
|
||||||
|
|
||||||
log.info("Reading prepared info: {}", preparedInfoPath);
|
log.info("Reading prepared info: {}", preparedInfoPath);
|
||||||
Dataset<ResultCountrySet> prepared = spark
|
final Dataset<Row> preparedInfoRaw = spark
|
||||||
.read()
|
.read()
|
||||||
.json(preparedInfoPath)
|
.json(preparedInfoPath);
|
||||||
.as(Encoders.bean(ResultCountrySet.class));
|
|
||||||
|
|
||||||
res
|
|
||||||
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
|
|
||||||
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
|
|
||||||
.write()
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.json(outputPath);
|
|
||||||
|
|
||||||
|
if (!preparedInfoRaw.isEmpty()) {
|
||||||
|
final Dataset<ResultCountrySet> prepared = preparedInfoRaw.as(Encoders.bean(ResultCountrySet.class));
|
||||||
|
res
|
||||||
|
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
|
||||||
|
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(outputPath);
|
||||||
|
} else {
|
||||||
|
res
|
||||||
|
.write()
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
||||||
|
|
|
@ -105,13 +105,13 @@
|
||||||
<join name="copy_wait" to="fork_exec_bulktag"/>
|
<join name="copy_wait" to="fork_exec_bulktag"/>
|
||||||
|
|
||||||
<fork name="fork_exec_bulktag">
|
<fork name="fork_exec_bulktag">
|
||||||
<path start="join_bulktag_publication"/>
|
<path start="bulktag_publication"/>
|
||||||
<path start="join_bulktag_dataset"/>
|
<path start="bulktag_dataset"/>
|
||||||
<path start="join_bulktag_otherresearchproduct"/>
|
<path start="bulktag_otherresearchproduct"/>
|
||||||
<path start="join_bulktag_software"/>
|
<path start="bulktag_software"/>
|
||||||
</fork>
|
</fork>
|
||||||
|
|
||||||
<action name="join_bulktag_publication">
|
<action name="bulktag_publication">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
|
@ -138,7 +138,7 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="join_bulktag_dataset">
|
<action name="bulktag_dataset">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
|
@ -165,7 +165,7 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="join_bulktag_otherresearchproduct">
|
<action name="bulktag_otherresearchproduct">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
|
@ -192,7 +192,7 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="join_bulktag_software">
|
<action name="bulktag_software">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
|
@ -269,7 +269,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>EOSC_tagging</name>
|
<name>EOSC tagging publication</name>
|
||||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -296,7 +296,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>EOSC_tagging</name>
|
<name>EOSC tagging dataset</name>
|
||||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -322,7 +322,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>EOSC_tagging</name>
|
<name>EOSC tagging software</name>
|
||||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -348,7 +348,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>EOSC_tagging</name>
|
<name>EOSC tagging ORP</name>
|
||||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
|
|
@ -582,201 +582,7 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_clean_country" to="should_patch_datasource_ids"/>
|
<join name="wait_clean_country" to="End"/>
|
||||||
|
|
||||||
<decision name="should_patch_datasource_ids">
|
|
||||||
<switch>
|
|
||||||
<case to="get_ds_master_duplicate">${wf:conf('shouldClean') eq true}</case>
|
|
||||||
<default to="End"/>
|
|
||||||
</switch>
|
|
||||||
</decision>
|
|
||||||
|
|
||||||
<action name="get_ds_master_duplicate">
|
|
||||||
<java>
|
|
||||||
<main-class>eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction</main-class>
|
|
||||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
|
||||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
|
||||||
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
|
|
||||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
|
||||||
</java>
|
|
||||||
<ok to="fork_patch_cfhb"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<fork name="fork_patch_cfhb">
|
|
||||||
<path start="patch_publication_cfhb"/>
|
|
||||||
<path start="patch_dataset_cfhb"/>
|
|
||||||
<path start="patch_otherresearchproduct_cfhb"/>
|
|
||||||
<path start="patch_software_cfhb"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="patch_publication_cfhb">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>patch publication cfhb</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
|
||||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/publication</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/publication</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
|
||||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_clean_cfhb"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="patch_dataset_cfhb">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>patch dataset cfhb</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
|
||||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/dataset</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/dataset</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
|
||||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_clean_cfhb"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="patch_otherresearchproduct_cfhb">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>patch otherresearchproduct cfhb</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
|
||||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/otherresearchproduct</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
|
||||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_clean_cfhb"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="patch_software_cfhb">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>patch software cfhb</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
|
||||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/software</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/software</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
|
||||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_clean_cfhb"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="wait_clean_cfhb" to="fork_copy_cfhb_patched_results"/>
|
|
||||||
|
|
||||||
<fork name="fork_copy_cfhb_patched_results">
|
|
||||||
<path start="copy_cfhb_patched_publication"/>
|
|
||||||
<path start="copy_cfhb_patched_dataset"/>
|
|
||||||
<path start="copy_cfhb_patched_otherresearchproduct"/>
|
|
||||||
<path start="copy_cfhb_patched_software"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="copy_cfhb_patched_publication">
|
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
|
||||||
<prepare>
|
|
||||||
<delete path="${graphOutputPath}/publication"/>
|
|
||||||
</prepare>
|
|
||||||
<arg>${workingDir}/cfHbPatched/publication</arg>
|
|
||||||
<arg>${graphOutputPath}/publication</arg>
|
|
||||||
</distcp>
|
|
||||||
<ok to="copy_wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="copy_cfhb_patched_dataset">
|
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
|
||||||
<prepare>
|
|
||||||
<delete path="${graphOutputPath}/dataset"/>
|
|
||||||
</prepare>
|
|
||||||
<arg>${workingDir}/cfHbPatched/dataset</arg>
|
|
||||||
<arg>${graphOutputPath}/dataset</arg>
|
|
||||||
</distcp>
|
|
||||||
<ok to="copy_wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="copy_cfhb_patched_otherresearchproduct">
|
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
|
||||||
<prepare>
|
|
||||||
<delete path="${graphOutputPath}/otherresearchproduct"/>
|
|
||||||
</prepare>
|
|
||||||
<arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
|
|
||||||
<arg>${graphOutputPath}/otherresearchproduct</arg>
|
|
||||||
</distcp>
|
|
||||||
<ok to="copy_wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="copy_cfhb_patched_software">
|
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
|
||||||
<prepare>
|
|
||||||
<delete path="${graphOutputPath}/software"/>
|
|
||||||
</prepare>
|
|
||||||
<arg>${workingDir}/cfHbPatched/software</arg>
|
|
||||||
<arg>${graphOutputPath}/software</arg>
|
|
||||||
</distcp>
|
|
||||||
<ok to="copy_wait"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="copy_wait" to="End"/>
|
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,8 @@ class ResolveEntitiesTest extends Serializable {
|
||||||
def generateUpdates(spark: SparkSession): Unit = {
|
def generateUpdates(spark: SparkSession): Unit = {
|
||||||
val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString
|
val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString
|
||||||
|
|
||||||
val pids: List[String] = template.linesWithSeparators.map(l => l.stripLineEnd)
|
val pids: List[String] = template.linesWithSeparators
|
||||||
|
.map(l => l.stripLineEnd)
|
||||||
.map { id =>
|
.map { id =>
|
||||||
val r = new Result
|
val r = new Result
|
||||||
r.setId(id.toLowerCase.trim)
|
r.setId(id.toLowerCase.trim)
|
||||||
|
@ -264,7 +265,8 @@ class ResolveEntitiesTest extends Serializable {
|
||||||
Source
|
Source
|
||||||
.fromInputStream(this.getClass.getResourceAsStream(s"publication"))
|
.fromInputStream(this.getClass.getResourceAsStream(s"publication"))
|
||||||
.mkString
|
.mkString
|
||||||
.linesWithSeparators.map(l => l.stripLineEnd)
|
.linesWithSeparators
|
||||||
|
.map(l => l.stripLineEnd)
|
||||||
.next(),
|
.next(),
|
||||||
classOf[Publication]
|
classOf[Publication]
|
||||||
)
|
)
|
||||||
|
|
|
@ -69,7 +69,8 @@ class ScholixGraphTest extends AbstractVocabularyTest {
|
||||||
getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/merge_result_scholix")
|
getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/merge_result_scholix")
|
||||||
)
|
)
|
||||||
.mkString
|
.mkString
|
||||||
val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators.map(l => l.stripLineEnd)
|
val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators
|
||||||
|
.map(l => l.stripLineEnd)
|
||||||
.sliding(2)
|
.sliding(2)
|
||||||
.map(s => (s.head, s(1)))
|
.map(s => (s.head, s(1)))
|
||||||
.map(p => (mapper.readValue(p._1, classOf[Relation]), mapper.readValue(p._2, classOf[ScholixSummary])))
|
.map(p => (mapper.readValue(p._1, classOf[Relation]), mapper.readValue(p._2, classOf[ScholixSummary])))
|
||||||
|
|
Loading…
Reference in New Issue