forked from D-Net/dnet-hadoop
Merge branch 'stable_ids' of https://code-repo.d4science.org/D-Net/dnet-hadoop into stable_ids
This commit is contained in:
commit
75144dacb3
|
@ -10,12 +10,12 @@ import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import com.google.common.collect.HashBiMap;
|
import com.google.common.collect.HashBiMap;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
|
||||||
|
@ -60,6 +60,37 @@ public class IdentifierFactory implements Serializable {
|
||||||
return pidFromInstance(pid, collectedFrom).distinct().collect(Collectors.toList());
|
return pidFromInstance(pid, collectedFrom).distinct().collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T extends Result> String createDOIBoostIdentifier(T entity) {
|
||||||
|
if (entity == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
StructuredProperty pid = null;
|
||||||
|
if (entity.getPid() != null) {
|
||||||
|
pid = entity
|
||||||
|
.getPid()
|
||||||
|
.stream()
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.filter(s -> s.getQualifier() != null && "doi".equalsIgnoreCase(s.getQualifier().getClassid()))
|
||||||
|
.filter(IdentifierFactory::pidFilter)
|
||||||
|
.findAny()
|
||||||
|
.orElse(null);
|
||||||
|
} else {
|
||||||
|
if (entity.getInstance() != null) {
|
||||||
|
pid = entity
|
||||||
|
.getInstance()
|
||||||
|
.stream()
|
||||||
|
.filter(i -> i.getPid() != null)
|
||||||
|
.flatMap(i -> i.getPid().stream())
|
||||||
|
.filter(IdentifierFactory::pidFilter)
|
||||||
|
.findAny()
|
||||||
|
.orElse(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pid != null)
|
||||||
|
return idFromPid(entity, pid, true);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an identifier from the most relevant PID (if available) provided by a known PID authority in the given
|
* Creates an identifier from the most relevant PID (if available) provided by a known PID authority in the given
|
||||||
* entity T. Returns entity.id when none of the PIDs meet the selection criteria is available.
|
* entity T. Returns entity.id when none of the PIDs meet the selection criteria is available.
|
||||||
|
|
|
@ -196,6 +196,8 @@ object DoiBoostMappingUtil {
|
||||||
//Case empty publication
|
//Case empty publication
|
||||||
if (publication == null)
|
if (publication == null)
|
||||||
return false
|
return false
|
||||||
|
if (publication.getId == null || publication.getId.isEmpty)
|
||||||
|
return false
|
||||||
|
|
||||||
//Case publication with no title
|
//Case publication with no title
|
||||||
if (publication.getTitle == null || publication.getTitle.size == 0)
|
if (publication.getTitle == null || publication.getTitle.size == 0)
|
||||||
|
|
|
@ -180,7 +180,7 @@ case object Crossref2Oaf {
|
||||||
|
|
||||||
|
|
||||||
// Ticket #6281 added pid to Instance
|
// Ticket #6281 added pid to Instance
|
||||||
instance.setPid(result.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava)
|
instance.setPid(result.getPid)
|
||||||
|
|
||||||
val has_review = (json \ "relation" \"has-review" \ "id")
|
val has_review = (json \ "relation" \"has-review" \ "id")
|
||||||
|
|
||||||
|
@ -206,6 +206,9 @@ case object Crossref2Oaf {
|
||||||
val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct
|
val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct
|
||||||
if (links.nonEmpty)
|
if (links.nonEmpty)
|
||||||
instance.setUrl(links.asJava)
|
instance.setUrl(links.asJava)
|
||||||
|
result.setId(IdentifierFactory.createDOIBoostIdentifier(result))
|
||||||
|
if (result.getId== null)
|
||||||
|
return null
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,6 +243,8 @@ case object Crossref2Oaf {
|
||||||
return List()
|
return List()
|
||||||
val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type"));
|
val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type"));
|
||||||
mappingResult(result, json, cOBJCategory)
|
mappingResult(result, json, cOBJCategory)
|
||||||
|
if (result == null)
|
||||||
|
return List()
|
||||||
|
|
||||||
|
|
||||||
val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List())
|
val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List())
|
||||||
|
|
|
@ -172,7 +172,7 @@ case object ConversionUtil {
|
||||||
i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava)
|
i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava)
|
||||||
|
|
||||||
// Ticket #6281 added pid to Instance
|
// Ticket #6281 added pid to Instance
|
||||||
i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava)
|
i.setPid(pub.getPid)
|
||||||
|
|
||||||
i.setCollectedfrom(createMAGCollectedFrom())
|
i.setCollectedfrom(createMAGCollectedFrom())
|
||||||
pub.setInstance(List(i).asJava)
|
pub.setInstance(List(i).asJava)
|
||||||
|
@ -197,8 +197,8 @@ case object ConversionUtil {
|
||||||
//IMPORTANT
|
//IMPORTANT
|
||||||
//The old method result.setId(generateIdentifier(result, doi))
|
//The old method result.setId(generateIdentifier(result, doi))
|
||||||
//will be replaced using IdentifierFactory
|
//will be replaced using IdentifierFactory
|
||||||
pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
|
|
||||||
pub.setId(IdentifierFactory.createIdentifier(pub))
|
pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub))
|
||||||
|
|
||||||
val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
|
val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
|
||||||
val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title")
|
val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title")
|
||||||
|
|
|
@ -67,7 +67,7 @@ object SparkProcessMAG {
|
||||||
MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber)
|
MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber)
|
||||||
} else
|
} else
|
||||||
mpa
|
mpa
|
||||||
}).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
|
}).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation", $"sequenceNumber")).as("authors"))
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors")
|
||||||
|
|
||||||
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
|
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package eu.dnetlib.doiboost.orcid
|
package eu.dnetlib.doiboost.orcid
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
|
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
|
||||||
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
||||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||||
|
@ -49,7 +50,11 @@ object ORCIDToOAF {
|
||||||
val pub:Publication = new Publication
|
val pub:Publication = new Publication
|
||||||
pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava)
|
pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava)
|
||||||
pub.setDataInfo(generateDataInfo())
|
pub.setDataInfo(generateDataInfo())
|
||||||
pub.setId(generateIdentifier(pub, doi.toLowerCase))
|
|
||||||
|
pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub))
|
||||||
|
if (pub.getId == null)
|
||||||
|
return null
|
||||||
|
|
||||||
try{
|
try{
|
||||||
|
|
||||||
val l:List[Author]= input.getAuthors.asScala.map(a=> {
|
val l:List[Author]= input.getAuthors.asScala.map(a=> {
|
||||||
|
|
|
@ -55,7 +55,6 @@ object UnpayWallToOAF {
|
||||||
|
|
||||||
val doi = (json \"doi").extract[String]
|
val doi = (json \"doi").extract[String]
|
||||||
|
|
||||||
|
|
||||||
val is_oa = (json\ "is_oa").extract[Boolean]
|
val is_oa = (json\ "is_oa").extract[Boolean]
|
||||||
|
|
||||||
val journal_is_oa= (json\ "journal_is_oa").extract[Boolean]
|
val journal_is_oa= (json\ "journal_is_oa").extract[Boolean]
|
||||||
|
@ -63,14 +62,6 @@ object UnpayWallToOAF {
|
||||||
val oaLocation:OALocation = (json \ "best_oa_location").extractOrElse[OALocation](null)
|
val oaLocation:OALocation = (json \ "best_oa_location").extractOrElse[OALocation](null)
|
||||||
|
|
||||||
val colour = get_color(is_oa, oaLocation, journal_is_oa)
|
val colour = get_color(is_oa, oaLocation, journal_is_oa)
|
||||||
pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava)
|
|
||||||
|
|
||||||
//IMPORTANT
|
|
||||||
//The old method pub.setId(IdentifierFactory.createIdentifier(pub))
|
|
||||||
//will be replaced using IdentifierFactory
|
|
||||||
//pub.setId(generateIdentifier(pub, doi.toLowerCase))
|
|
||||||
pub.setId(IdentifierFactory.createIdentifier(pub))
|
|
||||||
|
|
||||||
|
|
||||||
pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava)
|
pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava)
|
||||||
pub.setDataInfo(generateDataInfo())
|
pub.setDataInfo(generateDataInfo())
|
||||||
|
@ -86,12 +77,9 @@ object UnpayWallToOAF {
|
||||||
// i.setAccessright(getOpenAccessQualifier())
|
// i.setAccessright(getOpenAccessQualifier())
|
||||||
i.setUrl(List(oaLocation.url.get).asJava)
|
i.setUrl(List(oaLocation.url.get).asJava)
|
||||||
|
|
||||||
// Ticket #6281 added pid to Instance
|
|
||||||
i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava)
|
|
||||||
|
|
||||||
if (oaLocation.license.isDefined)
|
if (oaLocation.license.isDefined)
|
||||||
i.setLicense(asField(oaLocation.license.get))
|
i.setLicense(asField(oaLocation.license.get))
|
||||||
|
pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava)
|
||||||
|
|
||||||
// Ticket #6282 Adding open Access Colour
|
// Ticket #6282 Adding open Access Colour
|
||||||
if (colour.isDefined) {
|
if (colour.isDefined) {
|
||||||
|
@ -102,8 +90,15 @@ object UnpayWallToOAF {
|
||||||
a.setSchemename(ModelConstants.DNET_ACCESS_MODES)
|
a.setSchemename(ModelConstants.DNET_ACCESS_MODES)
|
||||||
a.setOpenAccessRoute(colour.get)
|
a.setOpenAccessRoute(colour.get)
|
||||||
i.setAccessright(a)
|
i.setAccessright(a)
|
||||||
|
i.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava)
|
||||||
}
|
}
|
||||||
pub.setInstance(List(i).asJava)
|
pub.setInstance(List(i).asJava)
|
||||||
|
|
||||||
|
//IMPORTANT
|
||||||
|
//The old method pub.setId(IdentifierFactory.createIdentifier(pub))
|
||||||
|
//will be replaced using IdentifierFactory
|
||||||
|
//pub.setId(generateIdentifier(pub, doi.toLowerCase))
|
||||||
|
pub.setId(IdentifierFactory.createDOIBoostIdentifier(pub))
|
||||||
pub
|
pub
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,11 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!-- MAG Parameters -->
|
<!-- MAG Parameters -->
|
||||||
|
<property>
|
||||||
|
<name>MAGDumpPath</name>
|
||||||
|
<description>the MAG dump working path</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>inputPathMAG</name>
|
<name>inputPathMAG</name>
|
||||||
<description>the MAG working path</description>
|
<description>the MAG working path</description>
|
||||||
|
@ -132,7 +137,10 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
|
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
@ -147,6 +155,43 @@
|
||||||
<move source="${inputPathCrossref}/crossref_ds_updated"
|
<move source="${inputPathCrossref}/crossref_ds_updated"
|
||||||
target="${inputPathCrossref}/crossref_ds"/>
|
target="${inputPathCrossref}/crossref_ds"/>
|
||||||
</fs>
|
</fs>
|
||||||
|
<ok to="ResetMagWorkingPath"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
<!-- MAG SECTION -->
|
||||||
|
<action name="ResetMagWorkingPath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${inputPathMAG}/dataset"/>
|
||||||
|
<delete path="${inputPathMAG}/process"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="ConvertMagToDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ConvertMagToDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Convert Mag to Dataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
<ok to="ConvertCrossrefToOAF"/>
|
<ok to="ConvertCrossrefToOAF"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
@ -164,46 +209,15 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
|
<arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
|
||||||
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="ResetMagWorkingPath"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<!-- MAG SECTION -->
|
|
||||||
<action name="ResetMagWorkingPath">
|
|
||||||
<fs>
|
|
||||||
<delete path="${inputPathMAG}/dataset"/>
|
|
||||||
<delete path="${inputPathMAG}/process"/>
|
|
||||||
<delete path="${inputPathMAG}/dataset"/>
|
|
||||||
</fs>
|
|
||||||
<ok to="ConvertMagToDataset"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="ConvertMagToDataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn-cluster</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Convert Mag to Dataset</name>
|
|
||||||
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
|
|
||||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
${sparkExtraOPT}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--sourcePath</arg><arg>${inputPathMAG}/input</arg>
|
|
||||||
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
|
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="ProcessMAG"/>
|
<ok to="ProcessMAG"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
@ -216,11 +230,14 @@
|
||||||
<class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class>
|
<class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class>
|
||||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorIntersectionMemory}
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
|
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||||
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg>
|
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg>
|
||||||
|
@ -245,10 +262,14 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg>
|
<arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg>
|
||||||
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
<arg>--targetPath</arg><arg>${workingPath}/uwPublication</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="ProcessORCID"/>
|
<ok to="ProcessORCID"/>
|
||||||
|
@ -268,10 +289,13 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
|
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
|
||||||
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
<arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="CreateDOIBoost"/>
|
<ok to="CreateDOIBoost"/>
|
||||||
|
@ -291,11 +315,15 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
|
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
|
||||||
<arg>--affiliationPath</arg><arg>${inputPathMAG}/process/Affiliations</arg>
|
<arg>--affiliationPath</arg><arg>${inputPathMAG}/dataset/Affiliations</arg>
|
||||||
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/process/PaperAuthorAffiliations</arg>
|
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/dataset/PaperAuthorAffiliations</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -316,7 +344,10 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
${sparkExtraOPT}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg>
|
<arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg>
|
||||||
<arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg>
|
<arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg>
|
||||||
|
|
|
@ -22,13 +22,11 @@ class UnpayWallMappingTest {
|
||||||
|
|
||||||
|
|
||||||
for (line <-Ilist.lines) {
|
for (line <-Ilist.lines) {
|
||||||
|
|
||||||
|
|
||||||
val p = UnpayWallToOAF.convertToOAF(line)
|
val p = UnpayWallToOAF.convertToOAF(line)
|
||||||
|
|
||||||
if(p!= null) {
|
if(p!= null) {
|
||||||
assertTrue(p.getPid.size()==1)
|
assertTrue(p.getInstance().size()==1)
|
||||||
logger.info(p.getId)
|
logger.info(s"ID : ${p.getId}")
|
||||||
}
|
}
|
||||||
assertNotNull(line)
|
assertNotNull(line)
|
||||||
assertTrue(line.nonEmpty)
|
assertTrue(line.nonEmpty)
|
||||||
|
|
Loading…
Reference in New Issue