forked from D-Net/dnet-hadoop
Merge pull request 'orcid-no-doi' (#123) from enrico.ottonello/dnet-hadoop:orcid-no-doi into beta
Reviewed-on: D-Net/dnet-hadoop#123
This commit is contained in:
parent
7e2caafe84
commit
bf9e0d2d4f
|
@ -4,6 +4,7 @@ package eu.dnetlib.doiboost.orcidnodoi;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -32,10 +33,7 @@ import com.google.gson.JsonParser;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import eu.dnetlib.dhp.schema.orcid.AuthorData;
|
import eu.dnetlib.dhp.schema.orcid.*;
|
||||||
import eu.dnetlib.dhp.schema.orcid.AuthorSummary;
|
|
||||||
import eu.dnetlib.dhp.schema.orcid.Work;
|
|
||||||
import eu.dnetlib.dhp.schema.orcid.WorkDetail;
|
|
||||||
import eu.dnetlib.doiboost.orcid.json.JsonHelper;
|
import eu.dnetlib.doiboost.orcid.json.JsonHelper;
|
||||||
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
|
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
|
||||||
import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf;
|
import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf;
|
||||||
|
@ -111,6 +109,10 @@ public class SparkGenEnrichedOrcidWorks {
|
||||||
Encoders.bean(WorkDetail.class));
|
Encoders.bean(WorkDetail.class));
|
||||||
logger.info("Works data loaded: " + workDataset.count());
|
logger.info("Works data loaded: " + workDataset.count());
|
||||||
|
|
||||||
|
final LongAccumulator warnNotFoundContributors = spark
|
||||||
|
.sparkContext()
|
||||||
|
.longAccumulator("warnNotFoundContributors");
|
||||||
|
|
||||||
JavaRDD<Tuple2<String, String>> enrichedWorksRDD = workDataset
|
JavaRDD<Tuple2<String, String>> enrichedWorksRDD = workDataset
|
||||||
.joinWith(
|
.joinWith(
|
||||||
authorDataset,
|
authorDataset,
|
||||||
|
@ -119,7 +121,21 @@ public class SparkGenEnrichedOrcidWorks {
|
||||||
(MapFunction<Tuple2<WorkDetail, AuthorData>, Tuple2<String, String>>) value -> {
|
(MapFunction<Tuple2<WorkDetail, AuthorData>, Tuple2<String, String>>) value -> {
|
||||||
WorkDetail w = value._1;
|
WorkDetail w = value._1;
|
||||||
AuthorData a = value._2;
|
AuthorData a = value._2;
|
||||||
AuthorMatcher.match(a, w.getContributors());
|
if (w.getContributors() == null
|
||||||
|
|| (w.getContributors() != null && w.getContributors().size() == 0)) {
|
||||||
|
Contributor c = new Contributor();
|
||||||
|
c.setName(a.getName());
|
||||||
|
c.setSurname(a.getSurname());
|
||||||
|
c.setCreditName(a.getCreditName());
|
||||||
|
c.setOid(a.getOid());
|
||||||
|
List<Contributor> contributors = Arrays.asList(c);
|
||||||
|
w.setContributors(contributors);
|
||||||
|
if (warnNotFoundContributors != null) {
|
||||||
|
warnNotFoundContributors.add(1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
AuthorMatcher.match(a, w.getContributors());
|
||||||
|
}
|
||||||
return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w));
|
return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w));
|
||||||
},
|
},
|
||||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||||
|
@ -172,7 +188,7 @@ public class SparkGenEnrichedOrcidWorks {
|
||||||
OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, p))))
|
OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, p))))
|
||||||
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
|
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
|
||||||
.saveAsNewAPIHadoopFile(
|
.saveAsNewAPIHadoopFile(
|
||||||
workingPath.concat(outputEnrichedWorksPath),
|
outputEnrichedWorksPath,
|
||||||
Text.class,
|
Text.class,
|
||||||
Text.class,
|
Text.class,
|
||||||
SequenceFileOutputFormat.class,
|
SequenceFileOutputFormat.class,
|
||||||
|
@ -180,6 +196,7 @@ public class SparkGenEnrichedOrcidWorks {
|
||||||
|
|
||||||
logger.info("parsedPublications: " + parsedPublications.value().toString());
|
logger.info("parsedPublications: " + parsedPublications.value().toString());
|
||||||
logger.info("enrichedPublications: " + enrichedPublications.value().toString());
|
logger.info("enrichedPublications: " + enrichedPublications.value().toString());
|
||||||
|
logger.info("warnNotFoundContributors: " + warnNotFoundContributors.value().toString());
|
||||||
logger.info("errorsGeneric: " + errorsGeneric.value().toString());
|
logger.info("errorsGeneric: " + errorsGeneric.value().toString());
|
||||||
logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString());
|
logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString());
|
||||||
logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString());
|
logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString());
|
||||||
|
|
|
@ -7,9 +7,14 @@
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
|
<value>/data/orcid_activities_2020/no_doi_dataset_prod/</value>
|
||||||
<description>path where to store the action set</description>
|
<description>path where to store the action set</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>processOutputPath</name>
|
||||||
|
<value>/data/orcid_activities_2020/process_no_doi_dataset_prod</value>
|
||||||
|
<description>temporary path where to store the action set</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>spark2GenNoDoiDatasetMaxExecutors</name>
|
<name>spark2GenNoDoiDatasetMaxExecutors</name>
|
||||||
<value>40</value>
|
<value>40</value>
|
||||||
|
@ -66,7 +71,7 @@
|
||||||
|
|
||||||
<action name="ResetWorkingPath">
|
<action name="ResetWorkingPath">
|
||||||
<fs>
|
<fs>
|
||||||
<delete path='${workingPath}/no_doi_dataset'/>
|
<delete path='${processOutputPath}'/>
|
||||||
</fs>
|
</fs>
|
||||||
<ok to="GenOrcidNoDoiDataset"/>
|
<ok to="GenOrcidNoDoiDataset"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -92,7 +97,7 @@
|
||||||
<arg>--workingPath</arg><arg>${workingPath}/</arg>
|
<arg>--workingPath</arg><arg>${workingPath}/</arg>
|
||||||
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||||
<arg>--orcidDataFolder</arg><arg>last_orcid_dataset</arg>
|
<arg>--orcidDataFolder</arg><arg>last_orcid_dataset</arg>
|
||||||
<arg>--outputEnrichedWorksPath</arg><arg>no_doi_dataset</arg>
|
<arg>--outputEnrichedWorksPath</arg><arg>${processOutputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="importOrcidNoDoi"/>
|
<ok to="importOrcidNoDoi"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -100,7 +105,7 @@
|
||||||
|
|
||||||
<action name="importOrcidNoDoi">
|
<action name="importOrcidNoDoi">
|
||||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
<arg>${workingPath}/no_doi_dataset/*</arg>
|
<arg>${processOutputPath}/*</arg>
|
||||||
<arg>${outputPath}</arg>
|
<arg>${outputPath}</arg>
|
||||||
</distcp>
|
</distcp>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
Loading…
Reference in New Issue