orcid-no-doi #43

Merged
claudio.atzori merged 45 commits from enrico.ottonello/dnet-hadoop:orcid-no-doi into master 2020-12-02 10:55:12 +01:00
1 changed files with 33 additions and 29 deletions
Showing only changes of commit 1f861f2b0d - Show all commits

View File

@ -1,14 +1,21 @@
package eu.dnetlib.doiboost.orcidnodoi; package eu.dnetlib.doiboost.orcidnodoi;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import java.io.IOException; import com.google.gson.JsonElement;
import java.util.Objects; import com.google.gson.JsonParser;
import java.util.Optional; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.doiboost.orcid.json.JsonHelper;
import eu.dnetlib.doiboost.orcid.model.AuthorData;
import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi;
import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf;
import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@ -16,24 +23,17 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.doiboost.orcid.json.JsonHelper;
import eu.dnetlib.doiboost.orcid.model.AuthorData;
import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi;
import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf;
import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher;
import scala.Tuple2; import scala.Tuple2;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/** /**
* This spark job generates one parquet file, containing orcid publications dataset * This spark job generates one parquet file, containing orcid publications dataset
*/ */
@ -42,6 +42,8 @@ public class SparkGenEnrichedOrcidWorks {
static Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class); static Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws IOException, Exception { public static void main(String[] args) throws IOException, Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -58,6 +60,7 @@ public class SparkGenEnrichedOrcidWorks {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath"); final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath");
final String outputWorksPath = parser.get("outputWorksPath"); final String outputWorksPath = parser.get("outputWorksPath");
final String hdfsServerUri = parser.get("hdfsServerUri");
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -96,7 +99,6 @@ public class SparkGenEnrichedOrcidWorks {
Encoders.tuple(Encoders.STRING(), Encoders.STRING())) Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.toJavaRDD(); .toJavaRDD();
enrichedWorksRDD.saveAsTextFile(workingPath + "enrichedWorksText/");
logger.info("Enriched works RDD ready."); logger.info("Enriched works RDD ready.");
final LongAccumulator parsedPublications = spark.sparkContext().longAccumulator("parsedPublications"); final LongAccumulator parsedPublications = spark.sparkContext().longAccumulator("parsedPublications");
@ -124,15 +126,17 @@ public class SparkGenEnrichedOrcidWorks {
}) })
.filter(p -> p != null); .filter(p -> p != null);
Dataset<Publication> publicationDataset = spark oafPublicationRDD
.createDataset( .mapToPair(
oafPublicationRDD.repartition(1).rdd(), p -> new Tuple2<>(p.getClass().toString(),

I just noticed the output format being set to parquet. As the records in this set must be integrated in the graph via the so called Actions Management system, the data created by this procedure should comply with the input format & model it requires, i.e. a

SequenceFile<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> where

I just noticed the output format being set to parquet. As the records in this set must be integrated in the graph via the so called Actions Management system, the data created by this procedure should comply with the input format & model it requires, i.e. a ```SequenceFile<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>``` where * keys are defined as the entity type class fully qualified name (e.g. [`eu.dnetlib.dhp.schema.oaf.Publication`](https://code-repo.d4science.org/D-Net/dnet-hadoop/src/branch/master/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java)) * values are defined as [`eu.dnetlib.dhp.schema.action.AtomicAction`](https://code-repo.d4science.org/D-Net/dnet-hadoop/src/branch/master/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicAction.java)s, a simple wrapper class with just two fields: 1) `Class<T> clazz`; and 2) `T payload`; where `T extends Oaf`.
Encoders.bean(Publication.class)); OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, (Publication) p))))
publicationDataset .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.write() .saveAsNewAPIHadoopFile(
.format("parquet") workingPath.concat(outputEnrichedWorksPath),
.mode(SaveMode.Overwrite) Text.class,
.save(workingPath + outputEnrichedWorksPath); Text.class,
SequenceFileOutputFormat.class,
sc.hadoopConfiguration());
logger.info("parsedPublications: " + parsedPublications.value().toString()); logger.info("parsedPublications: " + parsedPublications.value().toString());
logger.info("enrichedPublications: " + enrichedPublications.value().toString()); logger.info("enrichedPublications: " + enrichedPublications.value().toString());