output to one parquet file

This commit is contained in:
Enrico Ottonello 2020-07-30 18:38:07 +02:00
parent 196f36c6ed
commit 0377b40fba
2 changed files with 8 additions and 3 deletions

View File

@ -123,7 +123,7 @@ public class SparkGenEnrichedOrcidWorks {
Dataset<Publication> publicationDataset = spark Dataset<Publication> publicationDataset = spark
.createDataset( .createDataset(
oafPublicationRDD.rdd(), oafPublicationRDD.repartition(1).rdd(),
Encoders.bean(Publication.class)); Encoders.bean(Publication.class));
publicationDataset publicationDataset
.write() .write()

View File

@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
import com.google.gson.*; import com.google.gson.*;
import eu.dnetlib.dhp.common.PacePerson; import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility; import eu.dnetlib.doiboost.orcidnodoi.util.DumpToActionsUtility;
@ -217,6 +218,8 @@ public class PublicationToOaf implements Serializable {
final List<String> urls = createRepeatedField(rootElement, "urls"); final List<String> urls = createRepeatedField(rootElement, "urls");
if (urls != null && !urls.isEmpty()) { if (urls != null && !urls.isEmpty()) {
instance.setUrl(urls); instance.setUrl(urls);
} else {
dataInfo.setInvisible(true);
} }
final String pubDate = getPublicationDate(rootElement, "publicationDates"); final String pubDate = getPublicationDate(rootElement, "publicationDates");
@ -508,8 +511,10 @@ public class PublicationToOaf implements Serializable {
final StructuredProperty sp = new StructuredProperty(); final StructuredProperty sp = new StructuredProperty();
sp.setValue(orcidId); sp.setValue(orcidId);
final Qualifier q = new Qualifier(); final Qualifier q = new Qualifier();
q.setClassid("ORCID"); q.setClassid(ORCID.toLowerCase());
q.setClassname("ORCID"); q.setClassname(ORCID.toLowerCase());
q.setSchemeid(ModelConstants.DNET_PID_TYPES);
q.setSchemename(ModelConstants.DNET_PID_TYPES);
sp.setQualifier(q); sp.setQualifier(q);
return sp; return sp;
} }