diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala index dffc88c6c..70dcc0184 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/BioDBToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala @@ -1,4 +1,4 @@ -package eu.dnetllib.dhp.sx.bio +package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala similarity index 91% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 16d2b25a6..7a62437a3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -1,8 +1,8 @@ -package eu.dnetllib.dhp.sx.bio +package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Oaf -import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved +import BioDBToOAF.ScholixResolved import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -13,7 +13,7 @@ object SparkTransformBioDatabaseToOAF { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() val log: Logger = LoggerFactory.getLogger(getClass) - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/bio_to_oaf_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json"))) parser.parseArgument(args) val database: String = parser.get("database") log.info("database: {}", database) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 17bf3fa6b..17d21f19c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -1,10 +1,10 @@ -package eu.dnetllib.dhp.sx.bio.ebi +package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.oaf.Result +import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf} import eu.dnetlib.dhp.utils.ISLookupClientFactory -import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala index 85fbd99c4..eab6b1dc6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala @@ -1,8 +1,9 @@ -package eu.dnetllib.dhp.sx.bio.ebi +package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem -import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal} +import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal} +import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem +import eu.dnetlib.dhp.sx.bio.pubmed.PMJournal import org.apache.commons.io.IOUtils import org.apache.http.client.config.RequestConfig import org.apache.http.client.methods.HttpGet diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala similarity index 90% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index 10467884c..b19bfc23a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -1,9 +1,10 @@ -package eu.dnetllib.dhp.sx.bio.ebi +package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Oaf -import eu.dnetllib.dhp.sx.bio.BioDBToOAF -import eu.dnetllib.dhp.sx.bio.BioDBToOAF.EBILinkItem +import eu.dnetlib.dhp.sx.bio.BioDBToOAF +import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem +import BioDBToOAF.EBILinkItem import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMArticle.java similarity index 97% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMArticle.java index 305bb89be..881528425 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMArticle.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMArticle.java @@ -1,5 +1,5 @@ -package eu.dnetllib.dhp.sx.bio.pubmed; +package eu.dnetlib.dhp.sx.bio.pubmed; import java.io.Serializable; import java.util.ArrayList; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMAuthor.java similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMAuthor.java index c89929981..cef92d003 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMAuthor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMAuthor.java @@ -1,5 +1,5 @@ -package eu.dnetllib.dhp.sx.bio.pubmed; +package eu.dnetlib.dhp.sx.bio.pubmed; import java.io.Serializable; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMGrant.java similarity index 94% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMGrant.java index 7df5dd5f2..ce9420cc1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMGrant.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMGrant.java @@ -1,5 +1,5 @@ -package eu.dnetllib.dhp.sx.bio.pubmed; +package eu.dnetlib.dhp.sx.bio.pubmed; public class PMGrant { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMJournal.java similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMJournal.java index 6065416f8..863a23bd5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMJournal.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMJournal.java @@ -1,5 +1,5 @@ -package eu.dnetllib.dhp.sx.bio.pubmed; +package eu.dnetlib.dhp.sx.bio.pubmed; import java.io.Serializable; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala index 8fa226b7d..80cb0667c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMParser.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala @@ -1,4 +1,4 @@ -package eu.dnetllib.dhp.sx.bio.pubmed +package eu.dnetlib.dhp.sx.bio.pubmed import scala.xml.MetaData import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMSubject.java similarity index 94% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMSubject.java index e6ab61b87..862d39a94 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PMSubject.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMSubject.java @@ -1,5 +1,5 @@ -package eu.dnetllib.dhp.sx.bio.pubmed; +package eu.dnetlib.dhp.sx.bio.pubmed; public class PMSubject { private String value; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala index a1777a230..13f38408e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/pubmed/PubMedToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala @@ -1,4 +1,4 @@ -package eu.dnetllib.dhp.sx.bio.pubmed +package eu.dnetlib.dhp.sx.bio.pubmed import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml new file mode 100644 index 000000000..071d202b6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml @@ -0,0 +1,51 @@ + + + + sourcePath + the PDB Database Working Path + + + database + the PDB Database Working Path + + + + targetPath + the Target Working dir path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Convert Bio DB to OAF Dataset + eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --dbPath${sourcePath} + --database${database} + --targetPath${targetPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/bio_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/bio_to_oaf_params.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala similarity index 97% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala index c072f149c..893a6e628 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetllib/dhp/sx/bio/BioScholixTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala @@ -1,10 +1,10 @@ -package eu.dnetllib.dhp.sx.bio +package eu.dnetlib.dhp.sx.bio import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} -import eu.dnetllib.dhp.sx.bio.BioDBToOAF.ScholixResolved -import eu.dnetllib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PubMedToOaf} +import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved +import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PubMedToOaf} import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java index 38ffd28fe..7d0d6b0b8 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/ReadBlacklistFromDB.java @@ -91,8 +91,8 @@ public class ReadBlacklistFromDB implements Closeable { String encoding = rs.getString("relationship"); RelationInverse ri = ModelSupport.relationInverseMap.get(encoding); - direct.setRelClass(ri.getRelation()); - inverse.setRelClass(ri.getInverse()); + direct.setRelClass(ri.getRelClass()); + inverse.setRelClass(ri.getInverseRelClass()); direct.setRelType(ri.getRelType()); inverse.setRelType(ri.getRelType()); direct.setSubRelType(ri.getSubReltype()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java new file mode 100644 index 000000000..c0accd25a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -0,0 +1,134 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import com.clearspring.analytics.util.Lists; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class CopyHdfsOafApplication extends AbstractMigrationApplication { + + private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + CopyHdfsOafApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); + log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl); + + final String mdFormat = parser.get("mdFormat"); + log.info("mdFormat: {}", mdFormat); + + final String mdLayout = parser.get("mdLayout"); + log.info("mdLayout: {}", mdLayout); + + final String mdInterpretation = parser.get("mdInterpretation"); + log.info("mdInterpretation: {}", mdInterpretation); + + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); + final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); + + final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); + + final SparkConf conf = new SparkConf(); + runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, vocs, hdfsPath, paths)); + } + + public static void processPaths(final SparkSession spark, + final VocabularyGroup vocs, + final String outputPath, + final Set paths) { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + log.info("Found {} mdstores", paths.size()); + paths.forEach(log::info); + + final String[] validPaths = paths + .stream() + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .toArray(String[]::new); + log.info("Non empty mdstores {}", validPaths.length); + + if (validPaths.length > 0) { + // load the dataset + Dataset oaf = spark + .read() + .load(validPaths) + .as(Encoders.kryo(Oaf.class)); + + // dispatch each entity type individually in the respective graph subdirectory in append mode + for(Map.Entry e : ModelSupport.oafTypes.entrySet()) { + oaf + .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey())) + .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue())) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(outputPath + "/" + e.getKey()); + } + } + } + + private static Relation getInverse(Relation rel, VocabularyGroup vocs) { + final Relation inverse = new Relation(); + + inverse.setProperties(rel.getProperties()); + inverse.setValidated(rel.getValidated()); + inverse.setValidationDate(rel.getValidationDate()); + inverse.setCollectedfrom(rel.getCollectedfrom()); + inverse.setDataInfo(rel.getDataInfo()); + inverse.setLastupdatetimestamp(rel.getLastupdatetimestamp()); + + inverse.setSource(rel.getTarget()); + inverse.setTarget(rel.getSource()); + inverse.setRelType(rel.getRelType()); + inverse.setSubRelType(rel.getSubRelType()); + + return inverse; + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 4110bd806..6c72e4dfc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -135,30 +135,4 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication } } - private static Set mdstorePaths(final String mdstoreManagerUrl, - final String format, - final String layout, - final String interpretation) throws IOException { - final String url = mdstoreManagerUrl + "/mdstores/"; - final ObjectMapper objectMapper = new ObjectMapper(); - - final HttpGet req = new HttpGet(url); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String json = IOUtils.toString(response.getEntity().getContent()); - final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays - .stream(mdstores) - .filter(md -> md.getFormat().equalsIgnoreCase(format)) - .filter(md -> md.getLayout().equalsIgnoreCase(layout)) - .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) - .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) - .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) - .filter(md -> md.getSize() > 0) - .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .collect(Collectors.toSet()); - } - } - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 5d32fe926..7c88dbd9d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,8 +3,14 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -16,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; public class AbstractMigrationApplication implements Closeable { @@ -47,6 +57,43 @@ public class AbstractMigrationApplication implements Closeable { SequenceFile.Writer.valueClass(Text.class)); } + /** + * Retrieves from the metadata store manager application the list of paths associated with mdstores characterized + * by he given format, layout, interpretation + * @param mdstoreManagerUrl the URL of the mdstore manager service + * @param format the mdstore format + * @param layout the mdstore layout + * @param interpretation the mdstore interpretation + * @return the set of hdfs paths + * @throws IOException in case of HTTP communication issues + */ + protected static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation) throws IOException { + final String url = mdstoreManagerUrl + "/mdstores/"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays + .stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .collect(Collectors.toSet()); + } + } + } + private Configuration getConf() { return new Configuration(); /* diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json new file mode 100644 index 000000000..1e862198f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json @@ -0,0 +1,38 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "mdstoreManagerUrl", + "paramDescription": "the MdstoreManager url", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "mdFormat", + "paramDescription": "metadata format", + "paramRequired": true + }, + { + "paramName": "l", + "paramLongName": "mdLayout", + "paramDescription": "metadata layout", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "mdInterpretation", + "paramDescription": "metadata interpretation", + "paramRequired": true + }, + { + "paramName": "isu", + "paramLongName": "isLookupUrl", + "paramDescription": "the url of the ISLookupService", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 321ca4090..563923a5a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -544,6 +544,33 @@ --sourcePath${workingDir}/entities --graphRawPath${workingDir}/graph_raw + + + + + + + yarn + cluster + ImportOAF_hdfs_graph + eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${workingDir}/graph_raw + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatOAF + --mdLayoutstore + --mdInterpretationgraph + --isLookupUrl${isLookupUrl} + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/oozie_app/workflow.xml deleted file mode 100644 index 0df085ee1..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/bio/oozie_app/workflow.xml +++ /dev/null @@ -1,177 +0,0 @@ - - - - PDBPath - the PDB Database Working Path - - - - UNIPROTDBPath - the UNIPROT Database Working Path - - - - EBIDataset - the EBI Links Dataset Path - - - - ScholixResolvedDBPath - the Scholix Resolved Dataset Path - - - - CrossrefLinksPath - the CrossrefLinks Path - - - targetPath - the Target Working dir path - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn - cluster - Convert PDB to OAF Dataset - eu.dnetlib.dhp.sx.graph.bio.SparkTransformBioDatabaseToOAF - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --dbPath${PDBPath} - --databasePDB - --targetPath${targetPath}/pdb_OAF - - - - - - - - - yarn - cluster - Convert UNIPROT to OAF Dataset - eu.dnetlib.dhp.sx.graph.bio.SparkTransformBioDatabaseToOAF - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --dbPath${UNIPROTDBPath} - --databaseUNIPROT - --targetPath${targetPath}/uniprot_OAF - - - - - - - - - yarn - cluster - Convert EBI Links to OAF Dataset - eu.dnetlib.dhp.sx.graph.ebi.SparkEBILinksToOaf - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --sourcePath${EBIDataset} - --targetPath${targetPath}/ebi_OAF - - - - - - - - - yarn - cluster - Convert Scholix to OAF Dataset - eu.dnetlib.dhp.sx.graph.bio.SparkTransformBioDatabaseToOAF - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --dbPath${ScholixResolvedDBPath} - --databaseSCHOLIX - --targetPath${targetPath}/scholix_resolved_OAF - - - - - - - - - yarn - cluster - Convert Crossref Links to OAF Dataset - eu.dnetlib.dhp.sx.graph.bio.SparkTransformBioDatabaseToOAF - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=2000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --masteryarn - --dbPath${CrossrefLinksPath} - --databaseCROSSREF_LINKS - --targetPath${targetPath}/crossref_unresolved_relation_OAF - - - - - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java index 9bc2924c3..3beca7e7e 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java @@ -25,102 +25,102 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) public class SolrConfigExploreTest extends SolrExploreTest { - protected static SparkSession spark; + protected static SparkSession spark; - private static final Integer batchSize = 100; + private static final Integer batchSize = 100; - @Mock - private ISLookUpService isLookUpService; + @Mock + private ISLookUpService isLookUpService; - @Mock - private ISLookupClient isLookupClient; + @Mock + private ISLookupClient isLookupClient; - @BeforeEach - public void prepareMocks() throws ISLookUpException, IOException { - isLookupClient.setIsLookup(isLookUpService); + @BeforeEach + public void prepareMocks() throws ISLookUpException, IOException { + isLookupClient.setIsLookup(isLookUpService); - int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort(); + int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort(); - Mockito - .when(isLookupClient.getDsId(Mockito.anyString())) - .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); - Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort)); - Mockito - .when(isLookupClient.getLayoutSource(Mockito.anyString())) - .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); - Mockito - .when(isLookupClient.getLayoutTransformer()) - .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); - } + Mockito + .when(isLookupClient.getDsId(Mockito.anyString())) + .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); + Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort)); + Mockito + .when(isLookupClient.getLayoutSource(Mockito.anyString())) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); + Mockito + .when(isLookupClient.getLayoutTransformer()) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); + } - @BeforeAll - public static void before() { + @BeforeAll + public static void before() { - SparkConf conf = new SparkConf(); - conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); - conf.registerKryoClasses(new Class[] { - SerializableSolrInputDocument.class - }); + SparkConf conf = new SparkConf(); + conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); - conf.setMaster("local[1]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); + conf.setMaster("local[1]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); - spark = SparkSession - .builder() - .appName(XmlIndexingJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); + spark = SparkSession + .builder() + .appName(XmlIndexingJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - } + @AfterAll + public static void tearDown() { + spark.stop(); + } - @AfterAll - public static void tearDown() { - spark.stop(); - } + @Test + public void testSolrConfig() throws Exception { - @Test - public void testSolrConfig() throws Exception { + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + .run(isLookupClient); + Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null).run(isLookupClient); - Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); + String[] queryStrings = { + "cancer", + "graph", + "graphs" + }; - String[] queryStrings = { - "cancer", - "graph", - "graphs" - }; + for (String q : queryStrings) { + SolrQuery query = new SolrQuery(); + query.setRequestHandler("/exploreSearch"); + query.add(CommonParams.Q, q); + query.set("debugQuery", "on"); - for (String q : queryStrings) { - SolrQuery query = new SolrQuery(); - query.setRequestHandler("/exploreSearch"); - query.add(CommonParams.Q, q); - query.set("debugQuery", "on"); - - log.info("Submit query to Solr with params: {}", query.toString()); - QueryResponse rsp = miniCluster.getSolrClient().query(query); + log.info("Submit query to Solr with params: {}", query.toString()); + QueryResponse rsp = miniCluster.getSolrClient().query(query); // System.out.println(rsp.getHighlighting()); // System.out.println(rsp.getExplainMap()); - for (SolrDocument doc : rsp.getResults()) { - System.out.println( - doc.get("score") + "\t" + - doc.get("__indexrecordidentifier") + "\t" + - doc.get("resultidentifier") + "\t" + - doc.get("resultauthor") + "\t" + - doc.get("resultacceptanceyear") + "\t" + - doc.get("resultsubject") + "\t" + - doc.get("resulttitle") + "\t" + - doc.get("relprojectname") + "\t" + - doc.get("resultdescription") + "\t" + - doc.get("__all") + "\t" - ); - } - } - } + for (SolrDocument doc : rsp.getResults()) { + System.out + .println( + doc.get("score") + "\t" + + doc.get("__indexrecordidentifier") + "\t" + + doc.get("resultidentifier") + "\t" + + doc.get("resultauthor") + "\t" + + doc.get("resultacceptanceyear") + "\t" + + doc.get("resultsubject") + "\t" + + doc.get("resulttitle") + "\t" + + doc.get("relprojectname") + "\t" + + doc.get("resultdescription") + "\t" + + doc.get("__all") + "\t"); + } + } + } } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java index e20ecf152..ab98b1da2 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java @@ -34,98 +34,98 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) public class SolrConfigTest extends SolrTest { - protected static SparkSession spark; + protected static SparkSession spark; - private static final Integer batchSize = 100; + private static final Integer batchSize = 100; - @Mock - private ISLookUpService isLookUpService; + @Mock + private ISLookUpService isLookUpService; - @Mock - private ISLookupClient isLookupClient; + @Mock + private ISLookupClient isLookupClient; - @BeforeEach - public void prepareMocks() throws ISLookUpException, IOException { - isLookupClient.setIsLookup(isLookUpService); + @BeforeEach + public void prepareMocks() throws ISLookUpException, IOException { + isLookupClient.setIsLookup(isLookUpService); - int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort(); + int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort(); - Mockito - .when(isLookupClient.getDsId(Mockito.anyString())) - .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); - Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort)); - Mockito - .when(isLookupClient.getLayoutSource(Mockito.anyString())) - .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); - Mockito - .when(isLookupClient.getLayoutTransformer()) - .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); - } + Mockito + .when(isLookupClient.getDsId(Mockito.anyString())) + .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); + Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort)); + Mockito + .when(isLookupClient.getLayoutSource(Mockito.anyString())) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); + Mockito + .when(isLookupClient.getLayoutTransformer()) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); + } - @BeforeAll - public static void before() { + @BeforeAll + public static void before() { - SparkConf conf = new SparkConf(); - conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); - conf.registerKryoClasses(new Class[] { - SerializableSolrInputDocument.class - }); + SparkConf conf = new SparkConf(); + conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); - conf.setMaster("local[1]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); + conf.setMaster("local[1]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); - spark = SparkSession - .builder() - .appName(XmlIndexingJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); + spark = SparkSession + .builder() + .appName(XmlIndexingJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - } + @AfterAll + public static void tearDown() { + spark.stop(); + } - @AfterAll - public static void tearDown() { - spark.stop(); - } + @Test + public void testSolrConfig() throws Exception { - @Test - public void testSolrConfig() throws Exception { + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + .run(isLookupClient); + Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null).run(isLookupClient); - Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); + String[] queryStrings = { + "cancer", + "graph", + "graphs" + }; - String[] queryStrings = { - "cancer", - "graph", - "graphs" - }; + for (String q : queryStrings) { + SolrQuery query = new SolrQuery(); + query.add(CommonParams.Q, q); - for (String q : queryStrings) { - SolrQuery query = new SolrQuery(); - query.add(CommonParams.Q, q); + log.info("Submit query to Solr with params: {}", query.toString()); + QueryResponse rsp = miniCluster.getSolrClient().query(query); - log.info("Submit query to Solr with params: {}", query.toString()); - QueryResponse rsp = miniCluster.getSolrClient().query(query); - - for (SolrDocument doc : rsp.getResults()) { - System.out.println( - doc.get("score") + "\t" + - doc.get("__indexrecordidentifier") + "\t" + - doc.get("resultidentifier") + "\t" + - doc.get("resultauthor") + "\t" + - doc.get("resultacceptanceyear") + "\t" + - doc.get("resultsubject") + "\t" + - doc.get("resulttitle") + "\t" + - doc.get("relprojectname") + "\t" + - doc.get("resultdescription") + "\t" + - doc.get("__all") + "\t" - ); - } - } - } + for (SolrDocument doc : rsp.getResults()) { + System.out + .println( + doc.get("score") + "\t" + + doc.get("__indexrecordidentifier") + "\t" + + doc.get("resultidentifier") + "\t" + + doc.get("resultauthor") + "\t" + + doc.get("resultacceptanceyear") + "\t" + + doc.get("resultsubject") + "\t" + + doc.get("resulttitle") + "\t" + + doc.get("relprojectname") + "\t" + + doc.get("resultdescription") + "\t" + + doc.get("__all") + "\t"); + } + } + } } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrExploreTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrExploreTest.java index b86fd8ac8..34a9465a7 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrExploreTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrExploreTest.java @@ -23,87 +23,87 @@ import org.slf4j.LoggerFactory; public abstract class SolrExploreTest { - protected static final Logger log = LoggerFactory.getLogger(SolrTest.class); + protected static final Logger log = LoggerFactory.getLogger(SolrTest.class); - protected static final String FORMAT = "test"; - protected static final String DEFAULT_COLLECTION = FORMAT + "-index-openaire"; - protected static final String CONFIG_NAME = "testConfig"; + protected static final String FORMAT = "test"; + protected static final String DEFAULT_COLLECTION = FORMAT + "-index-openaire"; + protected static final String CONFIG_NAME = "testConfig"; - protected static MiniSolrCloudCluster miniCluster; + protected static MiniSolrCloudCluster miniCluster; - @TempDir - public static Path workingDir; + @TempDir + public static Path workingDir; - @BeforeAll - public static void setup() throws Exception { + @BeforeAll + public static void setup() throws Exception { - // random unassigned HTTP port - final int jettyPort = 0; - final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build(); + // random unassigned HTTP port + final int jettyPort = 0; + final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build(); - log.info(String.format("working directory: %s", workingDir.toString())); - System.setProperty("solr.log.dir", workingDir.resolve("logs").toString()); + log.info(String.format("working directory: %s", workingDir.toString())); + System.setProperty("solr.log.dir", workingDir.resolve("logs").toString()); - // create a MiniSolrCloudCluster instance - miniCluster = new MiniSolrCloudCluster(2, workingDir.resolve("solr"), jettyConfig); + // create a MiniSolrCloudCluster instance + miniCluster = new MiniSolrCloudCluster(2, workingDir.resolve("solr"), jettyConfig); - // Upload Solr configuration directory to ZooKeeper - String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig"; - File configDir = new File(solrZKConfigDir); + // Upload Solr configuration directory to ZooKeeper + String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig"; + File configDir = new File(solrZKConfigDir); - miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME); + miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME); - // override settings in the solrconfig include - System.setProperty("solr.tests.maxBufferedDocs", "100000"); - System.setProperty("solr.tests.maxIndexingThreads", "-1"); - System.setProperty("solr.tests.ramBufferSizeMB", "100"); + // override settings in the solrconfig include + System.setProperty("solr.tests.maxBufferedDocs", "100000"); + System.setProperty("solr.tests.maxIndexingThreads", "-1"); + System.setProperty("solr.tests.ramBufferSizeMB", "100"); - // use non-test classes so RandomizedRunner isn't necessary - System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler"); - System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory"); - System.setProperty("solr.lock.type", "single"); + // use non-test classes so RandomizedRunner isn't necessary + System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler"); + System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory"); + System.setProperty("solr.lock.type", "single"); - log.info(new ConfigSetAdminRequest.List().process(miniCluster.getSolrClient()).toString()); - log - .info( - CollectionAdminRequest.ClusterStatus - .getClusterStatus() - .process(miniCluster.getSolrClient()) - .toString()); + log.info(new ConfigSetAdminRequest.List().process(miniCluster.getSolrClient()).toString()); + log + .info( + CollectionAdminRequest.ClusterStatus + .getClusterStatus() + .process(miniCluster.getSolrClient()) + .toString()); - NamedList res = createCollection( - miniCluster.getSolrClient(), DEFAULT_COLLECTION, 4, 2, 20, CONFIG_NAME); - res.forEach(o -> log.info(o.toString())); + NamedList res = createCollection( + miniCluster.getSolrClient(), DEFAULT_COLLECTION, 4, 2, 20, CONFIG_NAME); + res.forEach(o -> log.info(o.toString())); - miniCluster.getSolrClient().setDefaultCollection(DEFAULT_COLLECTION); + miniCluster.getSolrClient().setDefaultCollection(DEFAULT_COLLECTION); - log - .info( - CollectionAdminRequest.ClusterStatus - .getClusterStatus() - .process(miniCluster.getSolrClient()) - .toString()); + log + .info( + CollectionAdminRequest.ClusterStatus + .getClusterStatus() + .process(miniCluster.getSolrClient()) + .toString()); - } + } - @AfterAll - public static void shutDown() throws Exception { - miniCluster.shutdown(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @AfterAll + public static void shutDown() throws Exception { + miniCluster.shutdown(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - protected static NamedList createCollection(CloudSolrClient client, String name, int numShards, - int replicationFactor, int maxShardsPerNode, String configName) throws Exception { - ModifiableSolrParams modParams = new ModifiableSolrParams(); - modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); - modParams.set("name", name); - modParams.set("numShards", numShards); - modParams.set("replicationFactor", replicationFactor); - modParams.set("collection.configName", configName); - modParams.set("maxShardsPerNode", maxShardsPerNode); - QueryRequest request = new QueryRequest(modParams); - request.setPath("/admin/collections"); - return client.request(request); - } + protected static NamedList createCollection(CloudSolrClient client, String name, int numShards, + int replicationFactor, int maxShardsPerNode, String configName) throws Exception { + ModifiableSolrParams modParams = new ModifiableSolrParams(); + modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); + modParams.set("name", name); + modParams.set("numShards", numShards); + modParams.set("replicationFactor", replicationFactor); + modParams.set("collection.configName", configName); + modParams.set("maxShardsPerNode", maxShardsPerNode); + QueryRequest request = new QueryRequest(modParams); + request.setPath("/admin/collections"); + return client.request(request); + } } diff --git a/pom.xml b/pom.xml index 3e8d6bc19..02bc5d8d4 100644 --- a/pom.xml +++ b/pom.xml @@ -753,7 +753,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.8.20] + [2.8.21] [4.0.3] [6.0.5] [3.1.6]