diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java index 0033978bf..d4d414fed 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java @@ -62,8 +62,16 @@ public class SolrAdminApplication implements Closeable { final String collection = ProvisionConstants.getCollectionName(format); log.info("collection: {}", collection); - try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) { - app.execute(action, collection, query, commit); + final boolean shouldIndex = Optional + .ofNullable(parser.get("shouldIndex")) + .map(Boolean::valueOf) + .orElse(false); + log.info("shouldIndex: {}", shouldIndex); + + if (shouldIndex) { + try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) { + app.execute(action, collection, query, commit); + } } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index 6f43ca3f7..4353e863f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; +import static org.apache.spark.sql.functions.*; import java.util.List; import java.util.Map; @@ -14,22 +15,31 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; +import eu.dnetlib.dhp.schema.solr.SolrRecord; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; /** @@ -65,13 +75,20 @@ public class XmlConverterJob { final String contextApiBaseUrl = parser.get("contextApiBaseUrl"); log.info("contextApiBaseUrl: {}", contextApiBaseUrl); + String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + final SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl)); + convertToXml( + spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl), + VocabularyGroup.loadVocsFromIS(isLookup)); }); } @@ -79,7 +96,8 @@ public class XmlConverterJob { final SparkSession spark, final String inputPath, final String outputPath, - final ContextMapper contextMapper) { + final ContextMapper contextMapper, + final VocabularyGroup vocabularies) { final XmlRecordFactory recordFactory = new XmlRecordFactory( prepareAccumulators(spark.sparkContext()), @@ -92,20 +110,25 @@ public class XmlConverterJob { log.info("Found paths: {}", String.join(",", paths)); + final ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); spark .read() .load(toSeq(paths)) .as(Encoders.kryo(JoinedEntity.class)) .map( - (MapFunction>) je -> new Tuple2<>( - je.getEntity().getId(), - recordFactory.build(je)), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())) - .javaRDD() - .mapToPair( - (PairFunction, Text, Text>) t -> new Tuple2<>(new Text(t._1()), - new Text(t._2()))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + (MapFunction>) je -> new Tuple2<>( + recordFactory.build(je), + ProvisionModelSupport.transform(je, contextMapper, vocabularies)), + Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class))) + .map( + (MapFunction, TupleWrapper>) t -> new TupleWrapper( + t._1(), mapper.writeValueAsString(t._2())), + Encoders.bean(TupleWrapper.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } private static void removeOutputDir(final SparkSession spark, final String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index cd401c6cb..3c107a985 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -21,9 +21,8 @@ import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +30,13 @@ import com.lucidworks.spark.util.SolrSupport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import scala.Tuple2; public class XmlIndexingJob { @@ -59,6 +60,8 @@ public class XmlIndexingJob { private final String outputPath; + private boolean shouldIndex; + private final SparkSession spark; public static void main(String[] args) throws Exception { @@ -101,7 +104,14 @@ public class XmlIndexingJob { .orElse(OutputFormat.SOLR); log.info("outputFormat: {}", outputFormat); + final boolean shouldIndex = Optional + .ofNullable(parser.get("shouldIndex")) + .map(Boolean::valueOf) + .orElse(false); + log.info("shouldIndex: {}", shouldIndex); + final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); @@ -113,18 +123,19 @@ public class XmlIndexingJob { final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, outputPath).run(isLookup); + new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, shouldIndex, outputPath) + .run(isLookup); }); } public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, - OutputFormat outputFormat, - String outputPath) { + OutputFormat outputFormat, boolean shouldIndex, String outputPath) { this.spark = spark; this.inputPath = inputPath; this.format = format; this.batchSize = batchSize; this.outputFormat = outputFormat; + this.shouldIndex = shouldIndex; this.outputPath = outputPath; } @@ -140,33 +151,45 @@ public class XmlIndexingJob { final String zkHost = isLookup.getZkHost(); log.info("zkHost: {}", zkHost); - final String version = getRecordDatestamp(); - final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); log.info("indexRecordTransformer {}", indexRecordXslt); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - JavaRDD docs = sc - .sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory().parseDocument(s)); + final Encoder encoder = Encoders.bean(TupleWrapper.class); + final Dataset records = spark + .read() + .schema(encoder.schema()) + .json(inputPath) + .as(encoder) + .map( + (MapFunction) t -> new TupleWrapper( + toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), t.getXml()), + t.getJson()), + Encoders.bean(TupleWrapper.class)); switch (outputFormat) { case SOLR: - final String collection = ProvisionConstants.getCollectionName(format); + if (shouldIndex) { + final String collection = ProvisionConstants.getCollectionName(format); - // SparkSolr >= 4 - // com.lucidworks.spark.BatchSizeType bt = com.lucidworks.spark.BatchSizeType.NUM_DOCS; - // SolrSupport.indexDocs(zkHost, collection, batchSize, bt, docs.rdd()); - // SparkSolr < 4 - SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + // SparkSolr >= 4 + // com.lucidworks.spark.BatchSizeType bt = com.lucidworks.spark.BatchSizeType.NUM_DOCS; + // SolrSupport.indexDocs(zkHost, collection, batchSize, bt, docs.rdd()); + // SparkSolr < 4 + JavaRDD docs = records + .javaRDD() + .map( + t -> new StreamingInputDocumentFactory().parseDocument(t.getXml(), t.getJson())); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + } break; case HDFS: - spark - .createDataset( - docs.map(SerializableSolrInputDocument::new).rdd(), + records + .map( + (MapFunction) t -> { + SolrInputDocument s = new StreamingInputDocumentFactory() + .parseDocument(t.getXml(), t.getJson()); + return new SerializableSolrInputDocument(s); + }, Encoders.kryo(SerializableSolrInputDocument.class)) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 0fb109fbb..da3915aee 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -5,11 +5,15 @@ import java.io.Serializable; import java.util.LinkedList; import java.util.List; -import eu.dnetlib.dhp.schema.oaf.OafEntity; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; -public class JoinedEntity implements Serializable { +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.*; - private E entity; +public class JoinedEntity implements Serializable { + + private OafEntity entity; private List links; @@ -17,16 +21,16 @@ public class JoinedEntity implements Serializable { links = new LinkedList<>(); } - public JoinedEntity(E entity) { + public JoinedEntity(OafEntity entity) { this(); this.entity = entity; } - public E getEntity() { + public OafEntity getEntity() { return entity; } - public void setEntity(E entity) { + public void setEntity(OafEntity entity) { this.entity = entity; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index d4ee24c14..6af692ed9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -1,13 +1,46 @@ package eu.dnetlib.dhp.oa.provision.model; -import java.util.List; +import static org.apache.commons.lang3.StringUtils.substringBefore; +import java.io.StringReader; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; +import org.jetbrains.annotations.Nullable; + +import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm; import eu.dnetlib.dhp.oa.provision.RelationList; import eu.dnetlib.dhp.oa.provision.SortableRelation; +import eu.dnetlib.dhp.oa.provision.utils.ContextDef; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.solr.*; +import eu.dnetlib.dhp.schema.solr.AccessRight; +import eu.dnetlib.dhp.schema.solr.Author; +import eu.dnetlib.dhp.schema.solr.Context; +import eu.dnetlib.dhp.schema.solr.Country; +import eu.dnetlib.dhp.schema.solr.Datasource; +import eu.dnetlib.dhp.schema.solr.EoscIfGuidelines; +import eu.dnetlib.dhp.schema.solr.Instance; +import eu.dnetlib.dhp.schema.solr.Journal; +import eu.dnetlib.dhp.schema.solr.OpenAccessColor; +import eu.dnetlib.dhp.schema.solr.OpenAccessRoute; +import eu.dnetlib.dhp.schema.solr.Organization; +import eu.dnetlib.dhp.schema.solr.Project; +import eu.dnetlib.dhp.schema.solr.Result; +import eu.dnetlib.dhp.schema.solr.Subject; public class ProvisionModelSupport { @@ -28,4 +61,631 @@ public class ProvisionModelSupport { RelationList.class)); return modelClasses.toArray(new Class[] {}); } + + public static SolrRecord transform(JoinedEntity je, ContextMapper contextMapper, VocabularyGroup vocs) { + SolrRecord s = new SolrRecord(); + final OafEntity e = je.getEntity(); + s + .setHeader( + SolrRecordHeader + .newInstance( + e.getId(), e.getOriginalId(), RecordType.valueOf(e.getClass().getSimpleName().toLowerCase()))); + s.setCollectedfrom(asProvenance(e.getCollectedfrom())); + s.setContext(asContext(e.getContext(), contextMapper)); + s.setPid(asPid(e.getPid())); + + if (e instanceof eu.dnetlib.dhp.schema.oaf.Result) { + s.setResult(mapResult((eu.dnetlib.dhp.schema.oaf.Result) e)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Datasource) { + s.setDatasource(mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) e)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Organization) { + s.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) { + s.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs)); + } + s + .setLinks( + Optional + .ofNullable(je.getLinks()) + .map( + links -> links + .stream() + .map(rew -> mapRelatedRecord(rew, vocs)) + .collect(Collectors.toList())) + .orElse(null)); + + return s; + } + + private static RelatedRecord mapRelatedRecord(RelatedEntityWrapper rew, VocabularyGroup vocs) { + RelatedRecord rr = new RelatedRecord(); + + final RelatedEntity re = rew.getTarget(); + final RecordType relatedRecordType = RecordType.valueOf(re.getType()); + final Relation relation = rew.getRelation(); + rr + .setHeader( + RelatedRecordHeader + .newInstance( + relation.getRelType(), + relation.getRelClass(), + relation.getTarget(), relatedRecordType)); + + rr.setAcronym(re.getAcronym()); + rr.setCode(re.getCode()); + rr.setContracttype(mapCodeLabel(re.getContracttype())); + rr.setCollectedfrom(asProvenance(re.getCollectedfrom())); + rr.setCodeRepositoryUrl(re.getCodeRepositoryUrl()); + rr.setCountry(asCountry(re.getCountry())); + rr.setDatasourcetype(mapCodeLabel(re.getDatasourcetype())); + rr.setDatasourcetypeui(mapCodeLabel(re.getDatasourcetypeui())); + rr.setDateofacceptance(re.getDateofacceptance()); + rr.setFunding(mapFunding(re.getFundingtree(), vocs)); + rr.setInstances(mapInstances(re.getInstances())); + rr.setLegalname(re.getLegalname()); + rr.setLegalshortname(re.getLegalshortname()); + rr.setOfficialname(re.getOfficialname()); + rr.setOpenairecompatibility(mapCodeLabel(re.getOpenairecompatibility())); + rr.setPid(asPid(re.getPid())); + rr.setProjectTitle(rr.getProjectTitle()); + rr.setPublisher(re.getPublisher()); + rr.setResulttype(mapQualifier(re.getResulttype())); + rr.setTitle(Optional.ofNullable(re.getTitle()).map(StructuredProperty::getValue).orElse(null)); + + return rr; + } + + private static Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p, VocabularyGroup vocs) { + Project ps = new Project(); + ps.setAcronym(mapField(p.getAcronym())); + ps.setCode(mapField(p.getCode())); + ps.setContracttype(mapCodeLabel(p.getContracttype())); + ps.setCurrency(mapField(p.getCurrency())); + ps.setDuration(mapField(p.getDuration())); + ps.setCallidentifier(mapField(p.getCallidentifier())); + ps.setEcarticle29_3(mapField(p.getEcarticle29_3())); + ps.setEnddate(mapField(p.getEnddate())); + ps.setFundedamount(p.getFundedamount()); + ps.setKeywords(mapField(p.getKeywords())); + ps.setStartdate(mapField(p.getStartdate())); + ps.setSubjects(asSubjectSP(p.getSubjects())); + ps.setSummary(mapField(p.getSummary())); + ps.setTitle(mapField(p.getTitle())); + ps.setTotalcost(p.getTotalcost()); + ps.setWebsiteurl(mapField(p.getWebsiteurl())); + ps.setFunding(mapFundingField(p.getFundingtree(), vocs)); + return ps; + } + + private static Funding mapFunding(List fundingtree, VocabularyGroup vocs) { + SAXReader reader = new SAXReader(); + return Optional + .ofNullable(fundingtree) + .flatMap( + ftree -> ftree + .stream() + .map(ft -> { + try { + Document doc = reader.read(new StringReader(ft)); + String countryCode = doc.valueOf("/fundingtree/funder/jurisdiction/text()"); + Country country = vocs + .find("dnet:countries") + .map(voc -> voc.getTerm(countryCode)) + .map(VocabularyTerm::getName) + .map(label -> Country.newInstance(countryCode, label)) + .orElse(null); + + String level0_id = doc.valueOf("//funding_level_0/id/text()"); + String level1_id = doc.valueOf("//funding_level_1/id/text()"); + String level2_id = doc.valueOf("//funding_level_2/id/text()"); + + return Funding + .newInstance( + Funder + .newInstance( + doc.valueOf("/fundingtree/funder/id/text()"), + doc.valueOf("/fundingtree/funder/shortname/text()"), + doc.valueOf("/fundingtree/funder/name/text()"), + country, new ArrayList<>()), + Optional + .ofNullable(level0_id) + .map( + id -> FundingLevel + .newInstance( + id, + doc.valueOf("//funding_level_0/description/text()"), + doc.valueOf("//funding_level_0/name/text()"))) + .orElse(null), + Optional + .ofNullable(level1_id) + .map( + id -> FundingLevel + .newInstance( + id, + doc.valueOf("//funding_level_1/description/text()"), + doc.valueOf("//funding_level_1/name/text()"))) + .orElse(null), + Optional + .ofNullable(level2_id) + .map( + id -> FundingLevel + .newInstance( + id, + doc.valueOf("//funding_level_2/description/text()"), + doc.valueOf("//funding_level_2/name/text()"))) + .orElse(null)); + + } catch (DocumentException e) { + throw new IllegalArgumentException(e); + } + }) + .findFirst()) + .orElse(null); + } + + private static Funding mapFundingField(List> fundingtree, VocabularyGroup vocs) { + return mapFunding( + Optional + .ofNullable(fundingtree) + .map(fts -> fts.stream().map(Field::getValue).collect(Collectors.toList())) + .orElse(null), + vocs); + } + + private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization o) { + Organization org = new Organization(); + org.setCountry(mapCodeLabel(o.getCountry())); + org.setLegalname(mapField(o.getLegalname())); + org.setLegalshortname(mapField(o.getLegalshortname())); + org.setAlternativeNames(mapFieldList(o.getAlternativeNames())); + org.setWebsiteurl(mapField(o.getWebsiteurl())); + org.setLogourl(mapField(o.getLogourl())); + + org.setEcenterprise(mapField(o.getEcenterprise())); + org.setEchighereducation(mapField(o.getEchighereducation())); + org.setEclegalbody(mapField(o.getEclegalbody())); + org.setEcinternationalorganization(mapField(o.getEcinternationalorganization())); + org.setEcinternationalorganizationeurinterests(mapField(o.getEcinternationalorganizationeurinterests())); + org.setEclegalperson(mapField(o.getEclegalperson())); + org.setEcnonprofit(mapField(o.getEcnonprofit())); + org.setEcnutscode(mapField(o.getEcnutscode())); + org.setEcresearchorganization(mapField(o.getEcresearchorganization())); + org.setEcsmevalidated(mapField(o.getEcsmevalidated())); + + return org; + } + + private static Datasource mapDatasource(eu.dnetlib.dhp.schema.oaf.Datasource d) { + Datasource ds = new Datasource(); + ds.setEnglishname(mapField(d.getEnglishname())); + ds.setOfficialname(mapField(d.getOfficialname())); + ds.setDescription(mapField(d.getDescription())); + ds.setJournal(mapJournal(d.getJournal())); + ds.setLogourl(mapField(d.getLogourl())); + ds.setAccessinfopackage(mapFieldList(d.getAccessinfopackage())); + ds.setCertificates(mapField(d.getCertificates())); + ds.setCitationguidelineurl(mapField(d.getCitationguidelineurl())); + ds.setConsenttermsofuse(d.getConsenttermsofuse()); + ds.setConsenttermsofusedate(d.getConsenttermsofusedate()); + ds.setContactemail(mapField(d.getContactemail())); + ds.setContentpolicies(mapCodeLabel(d.getContentpolicies())); + ds.setDatabaseaccessrestriction(mapField(d.getDatabaseaccessrestriction())); + ds.setDatabaseaccesstype(mapField(d.getDatabaseaccesstype())); + ds.setDataprovider(mapField(d.getDataprovider())); + ds.setDatasourcetype(mapCodeLabel(d.getDatasourcetype())); + ds.setDatasourcetypeui(mapCodeLabel(d.getDatasourcetypeui())); + ds.setDatauploadrestriction(mapField(d.getDatauploadrestriction())); + ds.setDatauploadtype(mapField(d.getDatauploadtype())); + ds.setDateofvalidation(mapField(d.getDateofvalidation())); + ds.setEoscdatasourcetype(mapCodeLabel(d.getEoscdatasourcetype())); + ds.setEosctype(mapCodeLabel(d.getEosctype())); + ds.setFulltextdownload(d.getFulltextdownload()); + ds.setJurisdiction(mapCodeLabel(d.getJurisdiction())); + ds.setLanguages(d.getLanguages()); + ds.setLatitude(mapField(d.getLatitude())); + ds.setLongitude(mapField(d.getLongitude())); + ds.setLastconsenttermsofusedate(d.getLastconsenttermsofusedate()); + ds.setMissionstatementurl(mapField(d.getMissionstatementurl())); + ds.setNamespaceprefix(mapField(d.getNamespaceprefix())); + ds.setOdcontenttypes(mapFieldList(d.getOdcontenttypes())); + ds.setOdlanguages(mapFieldList(d.getOdlanguages())); + ds.setOdnumberofitems(mapField(d.getOdnumberofitems())); + ds.setOdnumberofitemsdate(mapField(d.getOdnumberofitemsdate())); + ds.setOdpolicies(mapField(d.getOdpolicies())); + ds.setOpenairecompatibility(mapCodeLabel(d.getOpenairecompatibility())); + ds.setPidsystems(mapField(d.getPidsystems())); + ds.setPolicies(mapCodeLabelKV(d.getPolicies())); + ds.setPreservationpolicyurl(d.getPreservationpolicyurl()); + ds.setProvidedproducttypes(ds.getProvidedproducttypes()); + ds.setReleaseenddate(mapField(d.getReleasestartdate())); + ds.setReleasestartdate(mapField(d.getReleasestartdate())); + ds.setResearchentitytypes(ds.getResearchentitytypes()); + ds.setResearchproductaccesspolicies(d.getResearchproductaccesspolicies()); + ds.setResearchproductmetadataaccesspolicies(d.getResearchproductmetadataaccesspolicies()); + ds.setServiceprovider(mapField(d.getServiceprovider())); + ds.setSubjects(asSubjectSP(d.getSubjects())); + ds.setSubmissionpolicyurl(d.getSubmissionpolicyurl()); + ds.setThematic(d.getThematic()); + ds.setVersioncontrol(d.getVersioncontrol()); + ds.setVersioning(mapField(d.getVersioning())); + + return ds; + } + + private static Result mapResult(eu.dnetlib.dhp.schema.oaf.Result r) { + Result rs = new Result(); + + rs.setResulttype(mapQualifier(r.getResulttype())); + rs.setAuthor(asAuthor(r.getAuthor())); + rs.setMaintitle(getMaintitle(r.getTitle())); + rs.setOtherTitles(getOtherTitles(r.getTitle())); + rs.setDescription(mapFieldList(r.getDescription())); + rs.setSubject(asSubject(r.getSubject())); + rs.setPublicationdate(mapField(r.getDateofacceptance())); + rs.setPublisher(mapField(r.getPublisher())); + rs.setEmbargoenddate(mapField(r.getEmbargoenddate())); + rs.setSource(mapFieldList(r.getSource())); + rs.setFormat(mapFieldList(r.getFormat())); + rs.setContributor(mapFieldList(r.getContributor())); + rs.setCoverage(mapFieldList(r.getCoverage())); + rs + .setBestaccessright( + BestAccessRight + .newInstance(r.getBestaccessright().getClassid(), r.getBestaccessright().getClassname())); + rs.setFulltext(mapFieldList(r.getFulltext())); + rs.setCountry(asCountry(r.getCountry())); + rs.setEoscifguidelines(asEOSCIF(r.getEoscifguidelines())); + + rs.setGreen(r.getIsGreen()); + rs + .setOpenAccessColor( + Optional + .ofNullable(r.getOpenAccessColor()) + .map(color -> OpenAccessColor.valueOf(color.toString())) + .orElse(null)); + rs.setInDiamondJournal(r.getIsInDiamondJournal()); + rs.setPubliclyFunded(r.getPubliclyFunded()); + rs.setTransformativeAgreement(r.getTransformativeAgreement()); + + rs.setInstance(mapInstances(r.getInstance())); + + if (r instanceof Publication) { + Publication pub = (Publication) r; + rs.setJournal(mapJournal(pub.getJournal())); + } else if (r instanceof Dataset) { + Dataset d = (Dataset) r; + rs.setSize(mapField(d.getSize())); + rs.setVersion(mapField(d.getVersion())); + } else if (r instanceof Software) { + Software sw = (Software) r; + rs.setCodeRepositoryUrl(mapField(sw.getCodeRepositoryUrl())); + rs.setProgrammingLanguage(mapQualifier(sw.getProgrammingLanguage())); + rs.setDocumentationUrl(mapFieldList(sw.getDocumentationUrl())); + } else if (r instanceof OtherResearchProduct) { + OtherResearchProduct orp = (OtherResearchProduct) r; + rs.setContactperson(mapFieldList(orp.getContactperson())); + rs.setContactgroup(mapFieldList(orp.getContactgroup())); + rs.setTool(mapFieldList(orp.getTool())); + } + return rs; + } + + @Nullable + private static List getOtherTitles(List titleList) { + return Optional + .ofNullable(titleList) + .map( + titles -> titles + .stream() + .filter( + t -> !"main title" + .equals( + Optional + .ofNullable(t.getQualifier()) + .map(Qualifier::getClassid) + .orElse(null))) + .map(StructuredProperty::getValue) + .collect(Collectors.toList())) + .orElse(null); + } + + private static String getMaintitle(List titleList) { + return Optional + .ofNullable(titleList) + .flatMap( + titles -> titles + .stream() + .filter( + t -> "main title" + .equals( + Optional + .ofNullable(t.getQualifier()) + .map(Qualifier::getClassid) + .orElse(null))) + .map(StructuredProperty::getValue) + .findFirst()) + .orElse(null); + } + + private static List mapInstances(List instanceList) { + return Optional + .ofNullable(instanceList) + .map( + instances -> instances + .stream() + .map(instance -> { + Instance i = new Instance(); + i.setCollectedfrom(asProvenance(instance.getCollectedfrom())); + i.setHostedby(asProvenance(instance.getHostedby())); + i.setFulltext(i.getFulltext()); + i.setPid(asPid(instance.getPid())); + i.setAlternateIdentifier(asPid(instance.getAlternateIdentifier())); + i.setAccessright(mapAccessRight(instance.getAccessright())); + i.setInstancetype(mapQualifier(instance.getInstancetype())); + i.setLicense(mapField(instance.getLicense())); + i.setUrl(instance.getUrl()); + i.setRefereed(mapQualifier(instance.getRefereed())); + i.setDateofacceptance(mapField(instance.getDateofacceptance())); + i.setDistributionlocation(instance.getDistributionlocation()); + i.setProcessingcharges(getProcessingcharges(instance)); + return i; + }) + .collect(Collectors.toList())) + .orElse(null); + } + + private static APC getProcessingcharges(eu.dnetlib.dhp.schema.oaf.Instance instance) { + return Optional + .of( + APC + .newInstance( + mapField(instance.getProcessingchargecurrency()), + mapField(instance.getProcessingchargeamount()))) + .filter(apc -> Objects.nonNull(apc.getAmount()) && Objects.nonNull(apc.getCurrency())) + .orElse(null); + } + + private static AccessRight mapAccessRight(eu.dnetlib.dhp.schema.oaf.AccessRight accessright) { + return AccessRight + .newInstance( + mapQualifier(accessright), + Optional + .ofNullable(accessright.getOpenAccessRoute()) + .map(route -> OpenAccessRoute.valueOf(route.toString())) + .orElse(null)); + } + + private static T mapField(eu.dnetlib.dhp.schema.oaf.Field f) { + return Optional.ofNullable(f).map(Field::getValue).orElse(null); + } + + private static List mapFieldList(List> fl) { + return Optional + .ofNullable(fl) + .map(v -> v.stream().map(Field::getValue).collect(Collectors.toList())) + .orElse(null); + } + + private static String mapQualifier(eu.dnetlib.dhp.schema.oaf.Qualifier q) { + return Optional.ofNullable(q).map(Qualifier::getClassid).orElse(null); + } + + private static Journal mapJournal(eu.dnetlib.dhp.schema.oaf.Journal joaf) { + return Optional + .ofNullable(joaf) + .map(jo -> { + Journal j = new Journal(); + j.setConferencedate(jo.getConferencedate()); + j.setConferenceplace(jo.getConferenceplace()); + j.setEdition(jo.getEdition()); + j.setSp(jo.getSp()); + j.setEp(jo.getEp()); + j.setVol(jo.getVol()); + j.setIss(jo.getEdition()); + j.setName(jo.getName()); + j.setIssnPrinted(jo.getIssnPrinted()); + j.setIssnOnline(jo.getIssnOnline()); + j.setIssnLinking(jo.getIssnLinking()); + return j; + }) + .orElse(null); + } + + private static List asProvenance(List keyValueList) { + return Optional + .ofNullable(keyValueList) + .map( + kvs -> kvs + .stream() + .map(ProvisionModelSupport::asProvenance) + .collect(Collectors.toList())) + .orElse(null); + } + + private static Provenance asProvenance(KeyValue keyValue) { + return Optional.ofNullable(keyValue).map(cf -> Provenance.newInstance(cf.getKey(), cf.getValue())).orElse(null); + } + + private static List asContext(List ctxList, + ContextMapper contextMapper) { + + final Set contexts = Optional + .ofNullable(ctxList) + .map( + ctx -> ctx + .stream() + .map(eu.dnetlib.dhp.schema.oaf.Context::getId) + .collect(Collectors.toCollection(HashSet::new))) + .orElse(new HashSet<>()); + + /* FIXME: Workaround for CLARIN mining issue: #3670#note-29 */ + if (contexts.contains("dh-ch::subcommunity::2")) { + contexts.add("clarin"); + } + + return Optional + .ofNullable(contexts) + .map( + ctx -> ctx + .stream() + .map(contextPath -> { + Context context = new Context(); + String id = ""; + Map categoryMap = Maps.newHashMap(); + for (final String token : Splitter.on("::").split(contextPath)) { + id += token; + + final ContextDef def = contextMapper.get(id); + + if (def == null) { + continue; + } + if (def.getName().equals("context")) { + context.setId(def.getId()); + context.setLabel(def.getLabel()); + context.setType(def.getType()); + } + if (def.getName().equals("category")) { + Category category = Category.newInstance(def.getId(), def.getLabel()); + if (Objects.isNull(context.getCategory())) { + context.setCategory(Lists.newArrayList()); + } + context.getCategory().add(category); + categoryMap.put(def.getId(), category); + } + if (def.getName().equals("concept")) { + String parentId = StringUtils.substringBeforeLast(def.getId(), "::"); + if (categoryMap.containsKey(parentId)) { + categoryMap + .get(parentId) + .getConcept() + .add(Concept.newInstance(def.getId(), def.getLabel())); + } + } + id += "::"; + } + return context; + }) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asPid(List pidList) { + return Optional + .ofNullable(pidList) + .map( + pids -> pids + .stream() + .map(p -> Pid.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asAuthor(List authorList) { + return Optional + .ofNullable(authorList) + .map( + authors -> authors + .stream() + .map( + a -> Author + .newInstance(a.getFullname(), a.getName(), a.getSurname(), a.getRank(), asPid(a.getPid()))) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asSubject(List subjectList) { + return Optional + .ofNullable(subjectList) + .map( + subjects -> subjects + .stream() + .filter(s -> Objects.nonNull(s.getQualifier())) + .filter(s -> Objects.nonNull(s.getQualifier().getClassid())) + .map(s -> Subject.newInstance(s.getValue(), s.getQualifier().getClassid())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asSubjectSP(List subjectList) { + return Optional + .ofNullable(subjectList) + .map( + subjects -> subjects + .stream() + .filter(s -> Objects.nonNull(s.getQualifier())) + .filter(s -> Objects.nonNull(s.getQualifier().getClassid())) + .map(s -> Subject.newInstance(s.getValue(), s.getQualifier().getClassid())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static Country asCountry(eu.dnetlib.dhp.schema.oaf.Qualifier country) { + return Optional + .ofNullable(country) + .filter(c -> Objects.nonNull(c.getClassid()) && Objects.nonNull(c.getClassname())) + .map(c -> Country.newInstance(c.getClassid(), c.getClassname())) + .orElse(null); + } + + private static List asCountry(List countryList) { + return Optional + .ofNullable(countryList) + .map( + countries -> countries + .stream() + .map(c -> Country.newInstance(c.getClassid(), c.getClassname())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asEOSCIF(List eoscIfGuidelines) { + return Optional + .ofNullable(eoscIfGuidelines) + .map( + eoscif -> eoscif + .stream() + .map( + e -> EoscIfGuidelines + .newInstance(e.getCode(), e.getLabel(), e.getUrl(), e.getSemanticRelation())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List mapCodeLabelKV(List kvList) { + return Optional + .ofNullable(kvList) + .map( + kvs -> kvs + .stream() + .map(ProvisionModelSupport::mapCodeLabel) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List mapCodeLabel(List qualifiers) { + return Optional + .ofNullable(qualifiers) + .map( + list -> list + .stream() + .map(ProvisionModelSupport::mapCodeLabel) + .collect(Collectors.toList())) + .orElse(null); + } + + private static CodeLabel mapCodeLabel(Qualifier qualifier) { + return Optional + .ofNullable(qualifier) + .map(q -> CodeLabel.newInstance(q.getClassid(), q.getClassname())) + .orElse(null); + } + + private static CodeLabel mapCodeLabel(KeyValue kv) { + return Optional + .ofNullable(kv) + .map(q -> CodeLabel.newInstance(kv.getKey(), kv.getValue())) + .orElse(null); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java index 5c78d1826..f13c6c90e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java @@ -13,6 +13,8 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class RelatedEntity implements Serializable { + private static final long serialVersionUID = -4982643490443810597L; + private String id; private String type; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index 4a4a4a5be..4939ec8e9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -9,6 +9,8 @@ import eu.dnetlib.dhp.schema.oaf.Relation; public class RelatedEntityWrapper implements Serializable { + private static final long serialVersionUID = -2624854064081757234L; + private Relation relation; private RelatedEntity target; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java index b42f9ee83..51f65c7ac 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.oa.provision.utils; +import java.io.Serializable; import java.io.StringReader; import java.io.StringWriter; import java.util.HashMap; @@ -32,7 +33,7 @@ import com.google.common.collect.Lists; * * @author claudio */ -public class StreamingInputDocumentFactory { +public class StreamingInputDocumentFactory implements Serializable { private static final String INDEX_FIELD_PREFIX = "__"; @@ -40,6 +41,8 @@ public class StreamingInputDocumentFactory { private static final String INDEX_RESULT = INDEX_FIELD_PREFIX + RESULT; + private static final String INDEX_JSON_RESULT = INDEX_FIELD_PREFIX + "json"; + private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier"; private static final String DEFAULTDNETRESULT = "dnetResult"; @@ -71,13 +74,17 @@ public class StreamingInputDocumentFactory { this.resultName = resultName; } - public SolrInputDocument parseDocument(final String inputDocument) { + public SolrInputDocument parseDocument(final String xml) { + return parseDocument(xml, ""); + } + + public SolrInputDocument parseDocument(final String xml, final String json) { final StringWriter results = new StringWriter(); final List nsList = Lists.newLinkedList(); try { - XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument)); + XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(xml)); final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>()); @@ -95,13 +102,13 @@ public class StreamingInputDocumentFactory { } else if (TARGETFIELDS.equals(localName)) { parseTargetFields(indexDocument, parser); } else if (resultName.equals(localName)) { - copyResult(indexDocument, results, parser, nsList, resultName); + copyResult(indexDocument, json, results, parser, nsList, resultName); } } } if (!indexDocument.containsKey(INDEX_RECORD_ID)) { - throw new IllegalStateException("cannot extract record ID from: " + inputDocument); + throw new IllegalStateException("cannot extract record ID from: " + xml); } return indexDocument; @@ -171,6 +178,7 @@ public class StreamingInputDocumentFactory { */ protected void copyResult( final SolrInputDocument indexDocument, + final String json, final StringWriter results, final XMLEventReader parser, final List nsList, @@ -205,6 +213,7 @@ public class StreamingInputDocumentFactory { } writer.close(); indexDocument.addField(INDEX_RESULT, results.toString()); + indexDocument.addField(INDEX_JSON_RESULT, json); } finally { outputFactory.remove(); eventFactory.remove(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 46286e06a..2960e5aaa 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -29,6 +29,12 @@ "paramDescription": "decides the job output format, SOLR | HDFS", "paramRequired": false }, + { + "paramName": "si", + "paramLongName": "shouldIndex", + "paramDescription": "should the action actually index the records?", + "paramRequired": true + }, { "paramName": "op", "paramLongName": "outputPath", diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json index 653a69ed1..4509eb9de 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json @@ -16,5 +16,11 @@ "paramLongName": "contextApiBaseUrl", "paramDescription": "URL of the context API", "paramRequired": true + }, + { + "paramName": "isu", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the context ISLookup Service", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json index 23eca2f7b..53805d3c1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json @@ -28,5 +28,11 @@ "paramLongName": "commit", "paramDescription": "should the action be followed by a commit?", "paramRequired": false + }, + { + "paramName": "i", + "paramLongName": "shouldIndex", + "paramDescription": "should the action actually index the records?", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 9eab960f0..8908c5c83 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -591,22 +591,18 @@ --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputPath${workingDir}/join_entities - --outputPath${workingDir}/xml + + --inputPath/user/claudio.atzori/data/provision/join_entities + + + --outputPath/user/claudio.atzori/data/provision/xml_json_test --contextApiBaseUrl${contextApiBaseUrl} + --isLookupUrl${isLookupUrl} - + - - - ${wf:conf('shouldIndex') eq 'true'} - ${wf:conf('shouldIndex') eq 'false'} - - - - @@ -621,6 +617,7 @@ --actionDELETE_BY_QUERY --query${solrDeletionQuery} --committrue + --shouldIndex${shouldIndex} @@ -646,12 +643,17 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --inputPath${workingDir}/xml + + --inputPath/user/claudio.atzori/data/provision/xml_json_test --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} --outputFormat${outputFormat} - --outputPath${workingDir}/solr_documents + + + --outputPath/user/claudio.atzori/data/provision/solr_documents + + --shouldIndex${shouldIndex} @@ -669,6 +671,7 @@ --isLookupUrl${isLookupUrl} --format${format} --actionCOMMIT + --shouldIndex${shouldIndex} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java index 8800abf95..1a982ca39 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java @@ -57,7 +57,7 @@ public class EOSCFuture_Test { IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), OtherResearchProduct.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java index e72883055..8d5aa3f3a 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java @@ -63,7 +63,7 @@ public class IndexRecordTransformerTest { final Project pj = load("project.json", Project.class); final Relation rel = load("relToValidatedProject.json", Relation.class); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); je .setLinks( Lists @@ -86,7 +86,7 @@ public class IndexRecordTransformerTest { final Publication p = load("publication.json", Publication.class); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); final String record = xmlRecordFactory.build(je); assertNotNull(record); SolrInputDocument solrDoc = testRecordTransformation(record); @@ -102,7 +102,7 @@ public class IndexRecordTransformerTest { final Publication p = load("riunet.json", Publication.class); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); final String record = xmlRecordFactory.build(je); assertNotNull(record); testRecordTransformation(record); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/JoinedEntityTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/JoinedEntityTest.java new file mode 100644 index 000000000..d836eddd8 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/JoinedEntityTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.IOException; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Publication; + +class JoinedEntityTest { + + private static final Logger log = LoggerFactory.getLogger(JoinedEntityTest.class); + + @Test + void test_serialisation() throws IOException { + + Publication p = new Publication(); + p.setId("p1"); + Journal j = new Journal(); + j.setIss("1234-5678"); + p.setJournal(j); + + Organization o = new Organization(); + o.setId("o1"); + Field lName = new Field<>(); + lName.setValue("CNR"); + o.setLegalname(lName); + + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); + + final String json = mapper.writeValueAsString(new JoinedEntity(p)); + log.info(json); + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java index 3beca7e7e..c749781ec 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java @@ -86,7 +86,7 @@ public class SolrConfigExploreTest extends SolrExploreTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, true, null) .run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java index a9d885ecf..91dbf8978 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java @@ -95,7 +95,7 @@ public class SolrConfigTest extends SolrTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, false, null) .run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java index a3a140cf6..be481cc37 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java @@ -1,35 +1,35 @@ package eu.dnetlib.dhp.oa.provision; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; import java.io.IOException; import java.io.StringReader; import java.net.URI; -import java.util.Map; +import java.util.Optional; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.dom4j.io.SAXReader; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -70,6 +70,7 @@ public class XmlIndexingJobTest extends SolrTest { SparkConf conf = new SparkConf(); conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); + conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); @@ -97,12 +98,15 @@ public class XmlIndexingJobTest extends SolrTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - long nRecord = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, Text.class, Text.class) - .count(); + Dataset records = spark + .read() + .schema(Encoders.bean(TupleWrapper.class).schema()) + .json(inputPath) + .as(Encoders.bean(TupleWrapper.class)); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + long nRecord = records.count(); + + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, true, null) .run(isLookupClient); assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); @@ -137,40 +141,93 @@ public class XmlIndexingJobTest extends SolrTest { assertEquals( 0, rsp.getResults().getNumFound(), "the number of indexed records having peerreviewed = true"); + + rsp = miniCluster + .getSolrClient() + .query( + new SolrQuery() + .add(CommonParams.Q, "objidentifier:\"iddesignpres::ae77e56e84ad058d9e7f19fa2f7325db\"") + .add(CommonParams.FL, "__json")); + assertEquals( + 1, rsp.getResults().getNumFound(), + "the number of indexed records having the given identifier"); + Optional json = rsp + .getResults() + .stream() + .map(d -> d.getFieldValues("__json")) + .flatMap(d -> d.stream()) + .findFirst(); + + assertTrue(json.isPresent()); + + log.info((String) json.get()); + } @Test void testXmlIndexingJob_saveOnHDFS() throws Exception { - final String ID_XPATH = "//header/*[local-name()='objIdentifier']"; + final String ID_XPATH = "//*[local-name()='header']/*[local-name()='objIdentifier']"; String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + // String inputPath = "/Users/claudio/workspace/data/index"; - final JavaPairRDD xmlRecords = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, Text.class, Text.class); - long nRecord = xmlRecords.count(); - long xmlIdUnique = xmlRecords - .map(t -> t._2().toString()) - .map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH)) + Dataset records = spark + .read() + .schema(Encoders.bean(TupleWrapper.class).schema()) + .json(inputPath) + .as(Encoders.bean(TupleWrapper.class)); + + records.printSchema(); + + long nRecord = records.count(); + log.info("found {} records", nRecord); + + final Dataset ids = records + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map( + (MapFunction) s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH), + Encoders.STRING()); + + log.info("found {} ids", ids.count()); + + long xmlIdUnique = ids .distinct() .count(); + + log.info("found {} unique ids", xmlIdUnique); + assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.HDFS, outputPath) + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.HDFS, false, outputPath) .run(isLookupClient); final Dataset solrDocs = spark .read() .load(outputPath) .as(Encoders.kryo(SerializableSolrInputDocument.class)); + + solrDocs.foreach(doc -> { + assertNotNull(doc.get("__result")); + assertNotNull(doc.get("__json")); + }); + long docIdUnique = solrDocs.map((MapFunction) doc -> { final SolrInputField id = doc.getField("__indexrecordidentifier"); return id.getFirstValue().toString(); }, Encoders.STRING()) .distinct() .count(); - assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output records"); + assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output XML records"); + + long jsonUnique = solrDocs + .map( + (MapFunction) je -> (String) je.getField("__json").getValue(), + Encoders.STRING()) + .distinct() + .count(); + + assertEquals(jsonUnique, docIdUnique, "IDs should be unique among the output JSON records"); } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index ba9572b17..f26c384d2 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -42,7 +42,7 @@ public class XmlRecordFactoryTest { final Publication p = OBJECT_MAPPER .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); @@ -117,7 +117,7 @@ public class XmlRecordFactoryTest { final List links = Lists.newArrayList(); final RelatedEntityWrapper rew = new RelatedEntityWrapper(rel, relatedProject); links.add(rew); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); je.setLinks(links); final String xml = xmlRecordFactory.build(je); @@ -148,7 +148,7 @@ public class XmlRecordFactoryTest { final List links = Lists.newArrayList(); final RelatedEntityWrapper rew = new RelatedEntityWrapper(rel, relatedProject); links.add(rew); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); je.setLinks(links); final String xml = xmlRecordFactory.build(je); @@ -171,7 +171,7 @@ public class XmlRecordFactoryTest { final Datasource d = OBJECT_MAPPER .readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(d)); + final String xml = xmlRecordFactory.build(new JoinedEntity(d)); assertNotNull(xml); @@ -210,7 +210,7 @@ public class XmlRecordFactoryTest { IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")), OtherResearchProduct.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); @@ -233,7 +233,7 @@ public class XmlRecordFactoryTest { IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")), OtherResearchProduct.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); @@ -256,7 +256,7 @@ public class XmlRecordFactoryTest { IOUtils.toString(getClass().getResourceAsStream("iris-odf-4.json")), Publication.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema index e191c6223..d9381b58e 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema @@ -195,186 +195,147 @@ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - + @@ -382,26 +343,20 @@ - - - - - - - - - + + + diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 deleted file mode 100644 index ff4095a11..000000000 Binary files a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 and /dev/null differ diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000.json.gz b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000.json.gz new file mode 100644 index 000000000..8dfcea4fa Binary files /dev/null and b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000.json.gz differ diff --git a/pom.xml b/pom.xml index 6ef320253..a95bf100d 100644 --- a/pom.xml +++ b/pom.xml @@ -888,7 +888,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [4.17.2] + [6.0.0-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6]