From 821540f94a0ae055c74cd642ec6921465465e8ae Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Tue, 22 Oct 2024 10:13:30 +0200 Subject: [PATCH] [personEntity] updated the property file to include also the db parameters. The same for the wf definition. Refactoring for compilation --- .../personentity/CoAuthorshipIterator.java | 3 +- .../personentity/ExtractPerson.java | 99 +++++++++---------- .../personentity/as_parameters.json | 25 +++++ .../actionmanager/personentity/job.properties | 5 +- .../personentity/oozie_app/workflow.xml | 16 +++ .../orcid/ORCIDAuthorMatchersTest.scala | 1 + 6 files changed, 95 insertions(+), 54 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java index 94ac7ab28..131f3f466 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java @@ -61,8 +61,7 @@ public class CoAuthorshipIterator implements Iterator { private Relation getRelation(String orcid1, String orcid2) { String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); - Relation relation = - OafMapperUtils + Relation relation = OafMapperUtils .getRelation( source, target, ModelConstants.PERSON_PERSON_RELTYPE, ModelConstants.PERSON_PERSON_SUBRELTYPE, diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index 960dfbe44..fb0621b6e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -13,9 +13,12 @@ import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.common.DbClient; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -27,13 +30,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spark_project.jetty.util.StringUtil; - import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.orcid.model.Author; import eu.dnetlib.dhp.collection.orcid.model.Employment; import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -49,11 +52,6 @@ import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - public class ExtractPerson implements Serializable { private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); private static final String QUERY = "SELECT * FROM project_person WHERE pid_type = 'ORCID'"; @@ -77,8 +75,7 @@ public class ExtractPerson implements Serializable { public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE"; public static List collectedfromOpenAIRE = OafMapperUtils - .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); - + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); public static final DataInfo DATAINFO = OafMapperUtils .dataInfo( @@ -136,14 +133,15 @@ public class ExtractPerson implements Serializable { spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); extractInfoForActionSetFromORCID(spark, inputPath, workingDir); - extractInfoForActionSetFromProjects(spark, inputPath, workingDir, dbUrl, dbUser, dbPassword, workingDir + "/project", hdfsNameNode); + extractInfoForActionSetFromProjects( + spark, inputPath, workingDir, dbUrl, dbUser, dbPassword, workingDir + "/project", hdfsNameNode); createActionSet(spark, outputPath, workingDir); }); } private static void extractInfoForActionSetFromProjects(SparkSession spark, String inputPath, String workingDir, - String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException { + String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); @@ -164,41 +162,40 @@ public class ExtractPerson implements Serializable { public static Relation getRelationWithProject(ResultSet rs) { try { - return getProjectRelation(rs.getString("project"), rs.getString("pid"), - rs.getString("role")); + return getProjectRelation( + rs.getString("project"), rs.getString("pid"), + rs.getString("role")); } catch (final SQLException e) { throw new RuntimeException(e); } - } + } private static Relation getProjectRelation(String project, String orcid, String role) { - String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); - String target = project.substring(0,14) - + IdentifierFactory.md5(project.substring(15)); - List properties = new ArrayList<>(); + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + String target = project.substring(0, 14) + + IdentifierFactory.md5(project.substring(15)); + List properties = new ArrayList<>(); - Relation relation = OafMapperUtils - .getRelation( - source, target, ModelConstants.PROJECT_PERSON_RELTYPE, ModelConstants.PROJECT_PERSON_SUBRELTYPE, - ModelConstants.PROJECT_PERSON_PARTICIPATES, - collectedfromOpenAIRE, - DATAINFO, - null); - relation.setValidated(true); + Relation relation = OafMapperUtils + .getRelation( + source, target, ModelConstants.PROJECT_PERSON_RELTYPE, ModelConstants.PROJECT_PERSON_SUBRELTYPE, + ModelConstants.PROJECT_PERSON_PARTICIPATES, + collectedfromOpenAIRE, + DATAINFO, + null); + relation.setValidated(true); - if (StringUtil.isNotBlank(role)) { - KeyValue kv = new KeyValue(); - kv.setKey("role"); - kv.setValue(role); - properties.add(kv); - } - - - if (!properties.isEmpty()) - relation.setProperties(properties); - return relation; + if (StringUtil.isNotBlank(role)) { + KeyValue kv = new KeyValue(); + kv.setKey("role"); + kv.setValue(role); + properties.add(kv); + } + if (!properties.isEmpty()) + relation.setProperties(properties); + return relation; } @@ -211,7 +208,7 @@ public class ExtractPerson implements Serializable { } } - private static void createActionSet(SparkSession spark,String outputPath, String workingDir) { + private static void createActionSet(SparkSession spark, String outputPath, String workingDir) { Dataset people; people = spark @@ -221,7 +218,7 @@ public class ExtractPerson implements Serializable { (MapFunction) value -> OBJECT_MAPPER .readValue(value, Person.class), Encoders.bean(Person.class)); - + people .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) @@ -235,10 +232,10 @@ public class ExtractPerson implements Serializable { getRelations(spark, workingDir + "/affiliation") .toJavaRDD() .map(r -> new AtomicAction(r.getClass(), r))) - .union( - getRelations(spark, workingDir + "/project") - .toJavaRDD() - .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/project") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) @@ -276,7 +273,7 @@ public class ExtractPerson implements Serializable { .joinWith(authors, employmentDataset.col("orcid").equalTo(authors.col("orcid"))) .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); - //Mapping all the orcid profiles even if the profile has no visible works + // Mapping all the orcid profiles even if the profile has no visible works authors.map((MapFunction) op -> { Person person = new Person(); @@ -509,13 +506,13 @@ public class ExtractPerson implements Serializable { return null; } Relation relation = OafMapperUtils - .getRelation( - source, target, ModelConstants.RESULT_PERSON_RELTYPE, - ModelConstants.RESULT_PERSON_SUBRELTYPE, - ModelConstants.RESULT_PERSON_HASAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - DATAINFO, - null); + .getRelation( + source, target, ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), + DATAINFO, + null); relation.setValidated(true); return relation; } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json index 5175552e7..1894a6beb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -21,5 +21,30 @@ "paramLongName": "workingDir", "paramDescription": "the hdfs name node", "paramRequired": false +}, + { + "paramName": "pu", + "paramLongName": "postgresUrl", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + + { + "paramName": "ps", + "paramLongName": "postgresUser", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "pp", + "paramLongName": "postgresPassword", + "paramDescription": "the hdfs name node", + "paramRequired": false +},{ + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the hdfs name node", + "paramRequired": false } + ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties index d2269718c..ac63d8a68 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties @@ -1,2 +1,5 @@ inputPath=/data/orcid_2023/tables/ -outputPath=/user/miriam.baglioni/peopleAS \ No newline at end of file +outputPath=/user/miriam.baglioni/peopleAS +postgresUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus +postgresUser=dnet' +postgresPassword=dnetPwd \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml index 166e7bb9c..5b613a76a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml @@ -9,6 +9,18 @@ outputPath the path where to store the actionset + + postgresUrl + the path where to store the actionset + + + postgresUser + the path where to store the actionset + + + postgresPassword + the path where to store the actionset + sparkDriverMemory memory for driver process @@ -102,6 +114,10 @@ --inputPath${inputPath} --outputPath${outputPath} --workingDir${workingDir} + --hdfsNameNode${nameNode} + --postgresUrl${postgresUrl} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala index 4e5ad5365..eece56b74 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala @@ -31,6 +31,7 @@ class ORCIDAuthorMatchersTest { assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin")) // assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented } + @Test def testDocumentationNames(): Unit = { assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones")) }