some fix to make workflows runs

This commit is contained in:
Sandro La Bruzzo 2021-03-17 12:12:56 +01:00
parent 9cac6da9bd
commit cc5bbafa5d
7 changed files with 92 additions and 57 deletions

View File

@ -18,7 +18,7 @@ public class ModelConstants {
public static final String PUBMED_CENTRAL_ID = "10|opendoar____::eda80a3d5b344bc40f3bc04f65b7a357"; public static final String PUBMED_CENTRAL_ID = "10|opendoar____::eda80a3d5b344bc40f3bc04f65b7a357";
public static final String ARXIV_ID = "10|opendoar____::6f4922f45568161a8cdf4ad2299f6d23"; public static final String ARXIV_ID = "10|opendoar____::6f4922f45568161a8cdf4ad2299f6d23";
//VOCABULARY VALUE // VOCABULARY VALUE
public static final String ACCESS_RIGHT_OPEN = "OPEN"; public static final String ACCESS_RIGHT_OPEN = "OPEN";
public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies"; public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies";

View File

@ -180,7 +180,7 @@ case object Crossref2Oaf {
// Ticket #6281 added pid to Instance // Ticket #6281 added pid to Instance
instance.setPid(result.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) instance.setPid(result.getPid)
val has_review = (json \ "relation" \"has-review" \ "id") val has_review = (json \ "relation" \"has-review" \ "id")

View File

@ -172,7 +172,7 @@ case object ConversionUtil {
i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava) i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava)
// Ticket #6281 added pid to Instance // Ticket #6281 added pid to Instance
i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) i.setPid(pub.getPid)
i.setCollectedfrom(createMAGCollectedFrom()) i.setCollectedfrom(createMAGCollectedFrom())
pub.setInstance(List(i).asJava) pub.setInstance(List(i).asJava)

View File

@ -67,7 +67,7 @@ object SparkProcessMAG {
MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber) MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber)
} else } else
mpa mpa
}).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation", $"sequenceNumber")).as("authors"))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors") .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors")
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")

View File

@ -65,11 +65,7 @@ object UnpayWallToOAF {
val colour = get_color(is_oa, oaLocation, journal_is_oa) val colour = get_color(is_oa, oaLocation, journal_is_oa)
pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava)
//IMPORTANT
//The old method pub.setId(IdentifierFactory.createIdentifier(pub))
//will be replaced using IdentifierFactory
//pub.setId(generateIdentifier(pub, doi.toLowerCase))
pub.setId(IdentifierFactory.createIdentifier(pub))
pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava) pub.setCollectedfrom(List(createUnpayWallCollectedFrom()).asJava)
@ -87,7 +83,7 @@ object UnpayWallToOAF {
i.setUrl(List(oaLocation.url.get).asJava) i.setUrl(List(oaLocation.url.get).asJava)
// Ticket #6281 added pid to Instance // Ticket #6281 added pid to Instance
i.setPid(pub.getPid.asScala.filter(p => p.getQualifier.getClassid.equalsIgnoreCase("doi")).asJava) i.setPid(pub.getPid)
if (oaLocation.license.isDefined) if (oaLocation.license.isDefined)
i.setLicense(asField(oaLocation.license.get)) i.setLicense(asField(oaLocation.license.get))
@ -104,6 +100,14 @@ object UnpayWallToOAF {
i.setAccessright(a) i.setAccessright(a)
} }
pub.setInstance(List(i).asJava) pub.setInstance(List(i).asJava)
//IMPORTANT
//The old method pub.setId(IdentifierFactory.createIdentifier(pub))
//will be replaced using IdentifierFactory
//pub.setId(generateIdentifier(pub, doi.toLowerCase))
val id = IdentifierFactory.createIdentifier(pub)
logger.info(id);
pub.setId(IdentifierFactory.createIdentifier(pub))
pub pub
} }

View File

@ -54,6 +54,11 @@
</property> </property>
<!-- MAG Parameters --> <!-- MAG Parameters -->
<property>
<name>MAGDumpPath</name>
<description>the MAG dump working path</description>
</property>
<property> <property>
<name>inputPathMAG</name> <name>inputPathMAG</name>
<description>the MAG working path</description> <description>the MAG working path</description>
@ -132,7 +137,10 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg> <arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
@ -147,6 +155,43 @@
<move source="${inputPathCrossref}/crossref_ds_updated" <move source="${inputPathCrossref}/crossref_ds_updated"
target="${inputPathCrossref}/crossref_ds"/> target="${inputPathCrossref}/crossref_ds"/>
</fs> </fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- MAG SECTION -->
<action name="ResetMagWorkingPath">
<fs>
<delete path="${inputPathMAG}/dataset"/>
<delete path="${inputPathMAG}/process"/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ConvertCrossrefToOAF"/> <ok to="ConvertCrossrefToOAF"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -164,46 +209,15 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg> <arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg> <arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- MAG SECTION -->
<action name="ResetMagWorkingPath">
<fs>
<delete path="${inputPathMAG}/dataset"/>
<delete path="${inputPathMAG}/process"/>
<delete path="${inputPathMAG}/dataset"/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathMAG}/input</arg>
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessMAG"/> <ok to="ProcessMAG"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -216,11 +230,14 @@
<class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class> <class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar> <jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg> <arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg> <arg>--workingPath</arg><arg>${inputPathMAG}/process</arg>
@ -245,10 +262,14 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg> <arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg> <arg>--targetPath</arg><arg>${workingPath}/uwPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
<ok to="ProcessORCID"/> <ok to="ProcessORCID"/>
@ -268,10 +289,13 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg> <arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg> <arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
<ok to="CreateDOIBoost"/> <ok to="CreateDOIBoost"/>
@ -291,11 +315,15 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg> <arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--affiliationPath</arg><arg>${inputPathMAG}/process/Affiliations</arg> <arg>--affiliationPath</arg><arg>${inputPathMAG}/dataset/Affiliations</arg>
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/process/PaperAuthorAffiliations</arg> <arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/dataset/PaperAuthorAffiliations</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
@ -316,7 +344,10 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg> <arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg> <arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg>

View File

@ -28,7 +28,7 @@ class UnpayWallMappingTest {
if(p!= null) { if(p!= null) {
assertTrue(p.getPid.size()==1) assertTrue(p.getPid.size()==1)
logger.info(p.getId) logger.info("ID :",p.getId)
} }
assertNotNull(line) assertNotNull(line)
assertTrue(line.nonEmpty) assertTrue(line.nonEmpty)