forked from D-Net/dnet-hadoop
This commit is contained in:
parent
8752d275fa
commit
0d8e496a63
|
@ -2,7 +2,6 @@
|
|||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -67,7 +66,6 @@ public class SparkOrcidToResultFromSemRelJob {
|
|||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.bulktag.community.ResultTagger;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 09/12/23
|
||||
*/
|
||||
public class AppendNewRelations implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AppendNewRelations.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
AppendNewRelations.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_newrelation_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkHiveSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> appendNewRelation(spark, inputPath, outputPath));
|
||||
}
|
||||
|
||||
private static void appendNewRelation(SparkSession spark, String inputPath, String outputPath) {
|
||||
|
||||
readPath(spark, inputPath + "publication/relation", Relation.class)
|
||||
.union(readPath(spark, inputPath + "dataset/relation", Relation.class))
|
||||
.union(readPath(spark, inputPath + "otherresearchproduct/relation", Relation.class))
|
||||
.union(readPath(spark, inputPath + "software/relation", Relation.class))
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
|
||||
}
|
|
@ -52,10 +52,13 @@ public class PrepareResultInstRepoAssociation {
|
|||
String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String datasourceOrganizationPath = parser.get("datasourceOrganizationPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath : {}", workingPath);
|
||||
|
||||
final String datasourceOrganizationPath = workingPath + "/preparedInfo/datasourceOrganization";
|
||||
log.info("datasourceOrganizationPath {}: ", datasourceOrganizationPath);
|
||||
|
||||
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
|
||||
final String alreadyLinkedPath = workingPath + "/preparedInfo/alreadyLinked";
|
||||
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
|
||||
|
||||
List<String> blacklist = Optional
|
||||
|
|
|
@ -119,7 +119,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
|
|||
"left_outer")
|
||||
.flatMap(createRelationFn(), Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
|
|
|
@ -11,16 +11,11 @@
|
|||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName":"dop",
|
||||
"paramLongName":"datasourceOrganizationPath",
|
||||
"paramDescription": "path where to store/find association from datasource and organization",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName":"alp",
|
||||
"paramLongName":"alreadyLinkedPath",
|
||||
"paramDescription": "path where to store/find already linked results and organizations",
|
||||
"paramName":"wp",
|
||||
"paramLongName":"workingPath",
|
||||
"paramDescription": "the working path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched
|
||||
resumeFrom=OrcidPropagation
|
||||
resumeFrom=AffiliationInstitutionalRepository
|
||||
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
|
||||
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
|
||||
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
|
||||
|
@ -7,7 +7,20 @@ datasourceWhitelistForCountryPropagation=10|openaire____::3795d6478e30e2c9f787d4
|
|||
allowedtypes=pubsrepository::institutional
|
||||
outputPath=/tmp/miriam/enrichment_one_step
|
||||
organizationtoresultcommunitymap={"20|corda__h2020::3fb05a9524c3f790391261347852f638":["mes","euromarine"], "20|corda__h2020::e8dbe14cca9bf6fce09d468872f813f8":["mes","euromarine"], "20|snsf________::9b253f265e3bef5cae6d881fdf61aceb":["mes","euromarine"],"20|ukri________::e054eea0a47665af8c3656b5785ccf76":["mes","euromarine"],"20|corda__h2020::edc18d67c9b11fb616ca9f6e1db1b151":["mes","euromarine"],"20|ukri________::d5736d9da90521ddcdc7828a05a85e9a":["mes","euromarine"],"20|corda__h2020::f5d418d3aa1cf817ddefcc3fdc039f27":["mes","euromarine"],"20|snsf________::8fa091f8f25a846779acb4ea97b50aef":["mes","euromarine"],"20|corda__h2020::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|corda_______::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|snsf________::31d0a100e54e3cdb3c6f52d91e638c78":["mes","euromarine"],"20|corda__h2020::ea379ef91b8cc86f9ac5edc4169292db":["mes","euromarine"],"20|corda__h2020::f75ee2ee48e5cb0ec8c8d30aaa8fef70":["mes","euromarine"],"20|ukri________::e16010089551a1a9182a94604fc0ea59":["mes","euromarine"],"20|corda__h2020::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|corda_______::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|grid________::b2cbbf5eadbbf87d534b022bad3191d7":["mes","euromarine"],"20|snsf________::74730ef1439d7f7636a8be58a6b471b8":["mes","euromarine"],"20|nsf_________::ad72e19043a5a467e35f9b444d11563e":["mes","euromarine"],"20|ukri________::0fc3e92500290902a2d38ec2445e74c3":["mes","euromarine"],"20|grid________::ad2c29905da0eb3c06b3fa80cacd89ea":["mes","euromarine"],"20|corda__h2020::30b53e4d63d3724f00acb9cbaca40860":["mes","euromarine"],"20|corda__h2020::f60f84bee14ad93f0db0e49af1d5c317":["mes","euromarine"], "20|corda__h2020::7bf251ac3765b5e89d82270a1763d09f":["mes","euromarine"], "20|corda__h2020::65531bd11be9935948c7f2f4db1c1832":["mes","euromarine"], "20|corda__h2020::e0e98f86bbc76638bbb72a8fe2302946":["mes","euromarine"], "20|snsf________::3eb43582ac27601459a8d8b3e195724b":["mes","euromarine"], "20|corda__h2020::af2481dab65d06c8ea0ae02b5517b9b6":["mes","euromarine"], "20|corda__h2020::c19d05cfde69a50d3ebc89bd0ee49929":["mes","euromarine"], "20|corda__h2020::af0bfd9fc09f80d9488f56d71a9832f0":["mes","euromarine"], "20|ukri________::f33c02afb0dc66c49d0ed97ca5dd5cb0":["beopen"], "20|grid________::a867f78acdc5041b34acfe4f9a349157":["beopen"], "20|grid________::7bb116a1a9f95ab812bf9d2dea2be1ff":["beopen"], "20|corda__h2020::6ab0e0739dbe625b99a2ae45842164ad":["beopen"], "20|corda__h2020::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda_______::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda__h2020::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::15911e01e9744d57205825d77c218737":["beopen"], "20|opendoar____::056a41e24e2a9a67215e87bbee6a80ab":["beopen"], "20|opendoar____::7f67f2e6c6fbb0628f8160fcd3d92ae3":["beopen"], "20|grid________::a8ecfd7c084e561168bcbe6bf0daf3e3":["beopen"], "20|corda_______::7bbe6cc5d8ec1864739a04b0d020c9e9":["beopen"], "20|corda_______::3ff558e30c2e434d688539548300b050":["beopen"], "20|corda__h2020::5ffee5b3b83b33a8cf0e046877bd3a39":["beopen"], "20|corda__h2020::5187217e2e806a6df3579c46f82401bc":["beopen"], "20|grid________::5fa7e2709bcd945e26bfa18689adeec1":["beopen"], "20|corda_______::d8696683c53027438031a96ad27c3c07":["beopen"], "20|corda__h2020::d8696683c53027438031a96ad27c3c07":["beopen"], "20|ukri________::23a79ebdfa59790864e4a485881568c1":["beopen"], "20|corda__h2020::b76cf8fe49590a966953c37e18608af9":["beopen"], "20|grid________::d2f0204126ee709244a488a4cd3b91c2":["beopen"], "20|corda__h2020::05aba9d2ed17533d15221e5655ac11e6":["beopen"], "20|grid________::802401579481dc32062bdee69f5e6a34":["beopen"], "20|corda__h2020::3f6d9d54cac975a517ba6b252c81582d":["beopen"], "20|openorgs____::d11f981828c485cd23d93f7f24f24db1":["eut"], "20|openorgs____::e66fe5dd092752e1dd6fd29fc699933a":["eut"], "20|openorgs____::526468206bca24c1c90da6a312295cf4":["eut"], "20|openorgs____::08e311e656e65ccb32e07c66b15b6ff7":["eut"], "20|openorgs____::55a1f889758964b77682904218fdb298":["eut"], "20|openorgs____::530092b6970d60a5329beb9f39e8d7d4":["eut"], "20|openorgs____::aadafa39392b3e200102596a3a4aad9d":["eut"], "20|openorgs____::c3fe999c74fad308132b8a5971367dce":["eut"], "20|openorgs____::1624ff7c01bb641b91f4518539a0c28a":["aurora"], "20|openorgs____::cdda7cfe17c89eb50628ec2eb1f8acd2":["aurora"], "20|openorgs____::818b75030e0e40612d69e049843ede7e":["aurora"], "20|openorgs____::0b0102bae51f4f4ef5ba57fbe1523b92":["aurora"], "20|openorgs____::ed47496b44722f0e9d7b98898189be0d":["aurora"], "20|openorgs____::eb0669daa9efeb898a3090d8aac7c953":["aurora"], "20|openorgs____::eb391317ed0dc684aa81ac16265de041":["aurora"], "20|openorgs____::f7cfcc98245e22c7d6e321cde930e746":["aurora"], "20|openorgs____::f33179d3306ba2599f7a898b056b604f":["aurora"], "20|pending_org_::75c41e6dd18466709ef359323d96fa05":["aurora"]}
|
||||
pathMap={"author" : "$['author'][*]['fullname']", "title" : "$['title'][*]['value']", "orcid":"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']", "contributor" : "$['contributor'][*]['value']", "description" : "$['description'][*]['value']"}
|
||||
pathMap ={"author":"$['author'][*]['fullname']", \
|
||||
"title":"$['title'][*]['value']",\
|
||||
"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\
|
||||
"orcid_pending":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']" ,\
|
||||
"contributor" : "$['contributor'][*]['value']",\
|
||||
"description" : "$['description'][*]['value']",\
|
||||
"subject" :"$['subject'][*]['value']" , \
|
||||
"fos" : "$['subject'][?(@['qualifier']['classid']=='FOS')].value" ,\
|
||||
"sdg" : "$['subject'][?(@['qualifier']['classid']=='SDG')].value",\
|
||||
"journal":"$['journal'].name",\
|
||||
"hostedby":"$['instance'][*]['hostedby']['key']",\
|
||||
"collectedfrom":"$['instance'][*]['collectedfrom']['key']",\
|
||||
"publisher":"$['publisher'].value",\
|
||||
"publicationyear":"$['dateofacceptance'].value"}
|
||||
blacklist=empty
|
||||
allowedpids=orcid;orcid_pending
|
||||
baseURL = https://services.openaire.eu/openaire/community/
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/affiliationInstRepo</arg>
|
||||
<arg>--blacklist</arg><arg>${blacklist}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
</spark>
|
||||
<ok to="fork_join_apply_resulttoorganization_propagation"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -78,7 +79,7 @@
|
|||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
|
||||
<arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/affiliationinstrepo/publication/relation</arg>
|
||||
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/datasourceOrganization</arg>
|
||||
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/alreadyLinked</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
|
@ -107,7 +108,7 @@
|
|||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/affiliationinstrepo/dataset/relation</arg>
|
||||
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/datasourceOrganization</arg>
|
||||
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/alreadyLinked</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
|
@ -136,7 +137,7 @@
|
|||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
|
||||
<arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/affiliationinstrepo/otherresearchproduct/relation</arg>
|
||||
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/datasourceOrganization</arg>
|
||||
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/alreadyLinked</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
|
@ -165,7 +166,7 @@
|
|||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
|
||||
<arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/affiliationinstrepo/software/relation</arg>
|
||||
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/datasourceOrganization</arg>
|
||||
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/alreadyLinked</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
|
@ -175,7 +176,32 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait2" to="End"/>
|
||||
<join name="wait2" to="append_new_relations"/>
|
||||
|
||||
<action name="append_new_relations">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>append new relations</name>
|
||||
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.AppendNewRelations</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.dynamicAllocation.enabled=true
|
||||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||
</spark-opts>
|
||||
<arg>--outputPath</arg><arg>${sourcePath}/relation</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingDir}/affiliationinstrepo/</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
|
|
Loading…
Reference in New Issue