forked from D-Net/dnet-hadoop
imported more diffs from master branch; code formatting
This commit is contained in:
parent
1eaad89a3c
commit
d9532446eb
|
@ -7,6 +7,10 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
|||
|
||||
public class ModelConstants {
|
||||
|
||||
public static final String ORCID = "orcid";
|
||||
public static final String ORCID_PENDING = "orcid_pending";
|
||||
public static final String ORCID_CLASSNAME = "Open Researcher and Contributor ID";
|
||||
|
||||
public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
|
||||
public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
|
||||
|
||||
|
|
|
@ -9,8 +9,8 @@ import java.util.List;
|
|||
*/
|
||||
|
||||
public class BipScore implements Serializable {
|
||||
private String id; //doi
|
||||
private List<Score> scoreList; //unit as given in the inputfile
|
||||
private String id; // doi
|
||||
private List<Score> scoreList; // unit as given in the inputfile
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
|
|
|
@ -144,7 +144,6 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
|
||||
}
|
||||
|
||||
|
||||
private static List<Measure> getMeasure(BipScore value) {
|
||||
return value
|
||||
.getScoreList()
|
||||
|
|
|
@ -10,7 +10,6 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -22,6 +21,7 @@ import org.apache.spark.sql.expressions.Aggregator;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
|
@ -44,7 +44,7 @@ public class GroupEntitiesSparkJob {
|
|||
private final static String ID_JPATH = "$.id";
|
||||
|
||||
private static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.doiboost.orcid
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, Publication}
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
|
||||
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier}
|
||||
import org.apache.commons.lang.StringUtils
|
||||
|
@ -44,23 +44,19 @@ object ORCIDToOAF {
|
|||
}
|
||||
|
||||
|
||||
def convertTOOAF(input:ORCIDElement) :Publication = {
|
||||
val doi = input.doi
|
||||
def convertTOOAF(input:OrcidDOI) :Publication = {
|
||||
val doi = input.getDoi
|
||||
val pub:Publication = new Publication
|
||||
pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava)
|
||||
pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava)
|
||||
pub.setDataInfo(generateDataInfo())
|
||||
|
||||
//IMPORTANT
|
||||
//The old method pub.setId(IdentifierFactory.createIdentifier(pub))
|
||||
//will be replaced using IdentifierFactory
|
||||
pub.setId(generateIdentifier(pub, doi.toLowerCase))
|
||||
pub.setId(IdentifierFactory.createIdentifier(pub))
|
||||
|
||||
|
||||
try{
|
||||
pub.setAuthor(input.authors.map(a=> {
|
||||
generateAuthor(a.name, a.surname, a.creditName, a.oid)
|
||||
}).asJava)
|
||||
|
||||
val l:List[Author]= input.getAuthors.asScala.map(a=> {
|
||||
generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid)
|
||||
})(collection.breakOut)
|
||||
|
||||
pub.setAuthor(l.asJava)
|
||||
pub.setCollectedfrom(List(DoiBoostMappingUtil.createORIDCollectedFrom()).asJava)
|
||||
pub.setDataInfo(DoiBoostMappingUtil.generateDataInfo())
|
||||
pub
|
||||
|
@ -71,6 +67,13 @@ object ORCIDToOAF {
|
|||
}
|
||||
}
|
||||
|
||||
def generateOricPIDDatainfo():DataInfo = {
|
||||
val di =DoiBoostMappingUtil.generateDataInfo("0.91")
|
||||
di.getProvenanceaction.setClassid("sysimport:crosswalk:entityregistry")
|
||||
di.getProvenanceaction.setClassname("Harvested")
|
||||
di
|
||||
}
|
||||
|
||||
def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = {
|
||||
val a = new Author
|
||||
a.setName(given)
|
||||
|
@ -80,10 +83,10 @@ object ORCIDToOAF {
|
|||
else
|
||||
a.setFullname(s"$given $family")
|
||||
if (StringUtils.isNotBlank(orcid))
|
||||
a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava)
|
||||
a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateOricPIDDatainfo())).asJava)
|
||||
|
||||
a
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -45,24 +45,24 @@ object SparkConvertORCIDToOAF {
|
|||
Encoders.kryo(classOf[Publication])
|
||||
}
|
||||
|
||||
def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI]
|
||||
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
|
||||
def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI]
|
||||
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
|
||||
val mapper = new ObjectMapper()
|
||||
mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
|
||||
|
||||
val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI])))
|
||||
val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI])))
|
||||
|
||||
logger.info("Converting ORCID to OAF")
|
||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null)
|
||||
.map(d => (d.getId, d))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(getPublicationAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
logger.info("Converting ORCID to OAF")
|
||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null)
|
||||
.map(d => (d.getId, d))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(getPublicationAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
|
@ -85,4 +85,4 @@ def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
|
|||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -10,6 +10,7 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
|
||||
import javax.swing.text.html.Option;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerException;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
|
@ -42,6 +43,10 @@ public class XmlIndexingJob {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class);
|
||||
|
||||
public enum OutputFormat {
|
||||
SOLR, HDFS
|
||||
}
|
||||
|
||||
private static final Integer DEFAULT_BATCH_SIZE = 1000;
|
||||
|
||||
protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
|
||||
|
@ -52,6 +57,8 @@ public class XmlIndexingJob {
|
|||
|
||||
private int batchSize;
|
||||
|
||||
private OutputFormat outputFormat;
|
||||
|
||||
private String outputPath;
|
||||
|
||||
private SparkSession spark;
|
||||
|
@ -80,14 +87,22 @@ public class XmlIndexingJob {
|
|||
|
||||
final String outputPath = Optional
|
||||
.ofNullable(parser.get("outputPath"))
|
||||
.map(StringUtils::trim)
|
||||
.orElse(null);
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final Integer batchSize = parser.getObjectMap().containsKey("batchSize")
|
||||
? Integer.valueOf(parser.get("batchSize"))
|
||||
: DEFAULT_BATCH_SIZE;
|
||||
final Integer batchSize = Optional
|
||||
.ofNullable(parser.get("batchSize"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(DEFAULT_BATCH_SIZE);
|
||||
log.info("batchSize: {}", batchSize);
|
||||
|
||||
final OutputFormat outputFormat = Optional
|
||||
.ofNullable(parser.get("outputFormat"))
|
||||
.map(OutputFormat::valueOf)
|
||||
.orElse(OutputFormat.SOLR);
|
||||
log.info("outputFormat: {}", outputFormat);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
conf.registerKryoClasses(new Class[] {
|
||||
SerializableSolrInputDocument.class
|
||||
|
@ -100,15 +115,18 @@ public class XmlIndexingJob {
|
|||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
|
||||
new XmlIndexingJob(spark, inputPath, format, batchSize, outputPath).run(isLookup);
|
||||
new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, outputPath).run(isLookup);
|
||||
});
|
||||
}
|
||||
|
||||
public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, String outputPath) {
|
||||
public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize,
|
||||
OutputFormat outputFormat,
|
||||
String outputPath) {
|
||||
this.spark = spark;
|
||||
this.inputPath = inputPath;
|
||||
this.format = format;
|
||||
this.batchSize = batchSize;
|
||||
this.outputFormat = outputFormat;
|
||||
this.outputPath = outputPath;
|
||||
}
|
||||
|
||||
|
@ -137,17 +155,22 @@ public class XmlIndexingJob {
|
|||
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
|
||||
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s));
|
||||
|
||||
if (StringUtils.isNotBlank(outputPath)) {
|
||||
spark
|
||||
.createDataset(
|
||||
docs.map(s -> new SerializableSolrInputDocument(s)).rdd(),
|
||||
Encoders.kryo(SerializableSolrInputDocument.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(outputPath);
|
||||
} else {
|
||||
final String collection = ProvisionConstants.getCollectionName(format);
|
||||
SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd());
|
||||
switch (outputFormat) {
|
||||
case SOLR:
|
||||
final String collection = ProvisionConstants.getCollectionName(format);
|
||||
SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd());
|
||||
break;
|
||||
case HDFS:
|
||||
spark
|
||||
.createDataset(
|
||||
docs.map(s -> new SerializableSolrInputDocument(s)).rdd(),
|
||||
Encoders.kryo(SerializableSolrInputDocument.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(outputPath);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid outputFormat: " + outputFormat);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue