diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index c29c046998..960dfbe441 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -2,13 +2,18 @@ package eu.dnetlib.dhp.actionmanager.personentity; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static org.apache.spark.sql.functions.*; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; +import eu.dnetlib.dhp.common.DbClient; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; @@ -22,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spark_project.jetty.util.StringUtil; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -43,9 +49,14 @@ import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + public class ExtractPerson implements Serializable { private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); - + private static final String QUERY = "SELECT * FROM project_person WHERE pid_type = 'ORCID'"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String OPENAIRE_PREFIX = "openaire____"; private static final String SEPARATOR = "::"; @@ -62,6 +73,12 @@ public class ExtractPerson implements Serializable { private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556"; + public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE"; + + public static List collectedfromOpenAIRE = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + public static final DataInfo DATAINFO = OafMapperUtils .dataInfo( @@ -106,19 +123,130 @@ public class ExtractPerson implements Serializable { final String workingDir = parser.get("workingDir"); log.info("workingDir {}", workingDir); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - createActionSet(spark, inputPath, outputPath, workingDir); + extractInfoForActionSetFromORCID(spark, inputPath, workingDir); + extractInfoForActionSetFromProjects(spark, inputPath, workingDir, dbUrl, dbUser, dbPassword, workingDir + "/project", hdfsNameNode); + createActionSet(spark, outputPath, workingDir); }); } - private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + private static void extractInfoForActionSetFromProjects(SparkSession spark, String inputPath, String workingDir, + String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fos = fileSystem.create(hdfsWritePath); + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeRelation(getRelationWithProject(rs), writer)); + } + + } catch (IOException e) { + e.printStackTrace(); + } + + } + + public static Relation getRelationWithProject(ResultSet rs) { + try { + return getProjectRelation(rs.getString("project"), rs.getString("pid"), + rs.getString("role")); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private static Relation getProjectRelation(String project, String orcid, String role) { + + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + String target = project.substring(0,14) + + IdentifierFactory.md5(project.substring(15)); + List properties = new ArrayList<>(); + + Relation relation = OafMapperUtils + .getRelation( + source, target, ModelConstants.PROJECT_PERSON_RELTYPE, ModelConstants.PROJECT_PERSON_SUBRELTYPE, + ModelConstants.PROJECT_PERSON_PARTICIPATES, + collectedfromOpenAIRE, + DATAINFO, + null); + relation.setValidated(true); + + if (StringUtil.isNotBlank(role)) { + KeyValue kv = new KeyValue(); + kv.setKey("role"); + kv.setValue(role); + properties.add(kv); + } + + + if (!properties.isEmpty()) + relation.setProperties(properties); + return relation; + + + } + + protected static void writeRelation(final Relation relation, BufferedWriter writer) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(relation)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static void createActionSet(SparkSession spark,String outputPath, String workingDir) { + + Dataset people; + people = spark + .read() + .textFile(workingDir + "/people") + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Person.class), + Encoders.bean(Person.class)); + + people + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/coauthorship") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/affiliation") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/project") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + + private static void extractInfoForActionSetFromORCID(SparkSession spark, String inputPath, String workingDir) { Dataset authors = spark .read() .parquet(inputPath + "Authors") @@ -149,7 +277,7 @@ public class ExtractPerson implements Serializable { .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); //Mapping all the orcid profiles even if the profile has no visible works - Dataset people; + authors.map((MapFunction) op -> { Person person = new Person(); person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); @@ -257,34 +385,6 @@ public class ExtractPerson implements Serializable { .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingDir + "/affiliation"); - - people = spark - .read() - .textFile(workingDir + "/people") - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, Person.class), - Encoders.bean(Person.class)); - - people.show(false); - people - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .union( - getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) - .union( - getRelations(spark, workingDir + "/coauthorship") - .toJavaRDD() - .map(r -> new AtomicAction(r.getClass(), r))) - .union( - getRelations(spark, workingDir + "/affiliation") - .toJavaRDD() - .map(r -> new AtomicAction(r.getClass(), r))) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile( - outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); } private static Dataset getRelations(SparkSession spark, String path) {