diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala index c29614d33..ad84973c3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala @@ -142,6 +142,21 @@ object DataciteToOAFTransformation { } } + /*** + * Use the vocabulary dnet:publication_resource to find a synonym to one of these terms and get the instance.type. + * Using the dnet:result_typologies vocabulary, we look up the instance.type synonym + * to generate one of the following main entities: + * - publication + * - dataset + * - software + * otherresearchproduct + + * @param resourceType + * @param resourceTypeGeneral + * @param schemaOrg + * @param vocabularies + * @return + */ def getTypeQualifier( resourceType: String, resourceTypeGeneral: String, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java index 0033978bf..d4b256b66 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java @@ -23,7 +23,7 @@ public class SolrAdminApplication implements Closeable { private static final Logger log = LoggerFactory.getLogger(SolrAdminApplication.class); enum Action { - DELETE_BY_QUERY, COMMIT + DELETE_BY_QUERY, COMMIT, CREATE } private final CloudSolrClient solrClient; @@ -56,6 +56,8 @@ public class SolrAdminApplication implements Closeable { final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); + final String fields = isLookup.getLayoutSource(format); + final String zkHost = isLookup.getZkHost(); log.info("zkHost: {}", zkHost); @@ -63,7 +65,7 @@ public class SolrAdminApplication implements Closeable { log.info("collection: {}", collection); try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) { - app.execute(action, collection, query, commit); + app.execute(action, collection, query, commit, fields); } } @@ -73,10 +75,10 @@ public class SolrAdminApplication implements Closeable { } public SolrResponse commit(String collection) throws IOException, SolrServerException { - return execute(Action.COMMIT, collection, null, true); + return execute(Action.COMMIT, collection, null, true, null); } - public SolrResponse execute(Action action, String collection, String query, boolean commit) + public SolrResponse execute(Action action, String collection, String query, boolean commit, final String fields) throws IOException, SolrServerException { switch (action) { @@ -88,6 +90,11 @@ public class SolrAdminApplication implements Closeable { return rsp; case COMMIT: return solrClient.commit(collection); + case CREATE: + SolrUtil + .uploadZookeperConfig(this.solrClient.getZkStateReader().getZkClient(), collection, true, fields); + SolrUtil.createCollection(this.solrClient, collection, 48, 1, 12, collection); + default: throw new IllegalArgumentException("action not managed: " + action); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrUtil.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrUtil.java index 80d0fcd68..e75d1ac38 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrUtil.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrUtil.java @@ -20,6 +20,7 @@ import javax.xml.transform.Transformer; import javax.xml.transform.TransformerFactory; import org.apache.commons.io.IOUtils; +import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.cloud.SolrZkClient; @@ -179,7 +180,7 @@ public class SolrUtil { } public static NamedList createCollection(CloudSolrClient client, String name, int numShards, - int replicationFactor, int maxShardsPerNode, String configName) throws Exception { + int replicationFactor, int maxShardsPerNode, String configName) throws SolrServerException, IOException { ModifiableSolrParams modParams = new ModifiableSolrParams(); modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); modParams.set("name", name); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/scholix/ScholixToSolr.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/scholix/ScholixToSolr.java index 7708242e3..3b66bc097 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/scholix/ScholixToSolr.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/scholix/ScholixToSolr.java @@ -2,26 +2,26 @@ package eu.dnetlib.dhp.oa.provision.scholix; import java.io.IOException; -import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; +import org.apache.spark.api.java.function.MapFunction; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; import eu.dnetlib.dhp.schema.sx.scholix.*; -public class ScholixToSolr { +public class ScholixToSolr implements MapFunction { final static ObjectMapper MAPPER = new ObjectMapper(); - public static SolrInputDocument toSolrDocument(final String json) { + public static SerializableSolrInputDocument toSolrDocument(final String json) { try { final Scholix input = MAPPER.readValue(json, Scholix.class); - final SolrInputDocument output = new SolrInputDocument(); + final SerializableSolrInputDocument output = new SerializableSolrInputDocument(); fillEntityField(output, input.getSource(), "source"); fillEntityField(output, input.getTarget(), "target"); @@ -66,7 +66,7 @@ public class ScholixToSolr { } } - private static void fillEntityField(final SolrInputDocument document, final ScholixResource resource, + private static void fillEntityField(final SerializableSolrInputDocument document, final ScholixResource resource, final String prefix) { document.addField(prefix + "_identifier", resource.getDnetIdentifier()); @@ -114,4 +114,8 @@ public class ScholixToSolr { } + @Override + public SerializableSolrInputDocument call(String s) throws Exception { + return toSolrDocument(s); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnSOLR.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnSOLR.java new file mode 100644 index 000000000..712178719 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnSOLR.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.sx.provision; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.solr.common.SolrInputDocument; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lucidworks.spark.util.SolrSupport; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.scholix.ScholixToSolr; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; + +public class SparkIndexCollectionOnSOLR { + + private static final Integer DEFAULT_BATCH_SIZE = 1000; + + // LOGGER initialized + private static final Logger log = LoggerFactory.getLogger(SparkIndexCollectionOnSOLR.class); + + public static void main(String[] args) throws IOException { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + SparkIndexCollectionOnSOLR.class + .getResourceAsStream("/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json")))); + + final String cluster = parser.get("cluster"); + log.info("Cluster is {}", cluster); + + final String format = parser.get("format"); + log.info("Index format name is {}", format); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl is {}", isLookupUrl); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final Integer batchSize = Optional + .ofNullable(parser.get("batchSize")) + .map(Integer::valueOf) + .orElse(DEFAULT_BATCH_SIZE); + log.info("batchSize: {}", batchSize); + + final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final ISLookupClient isLookupClient = new ISLookupClient( + ISLookupClientFactory.getLookUpService(isLookupUrl)); + final String zkHost = isLookupClient.getZkHost(); + log.info("zkHost: {}", zkHost); + feedScholixToSOLRIndex(spark, inputPath, format, batchSize, zkHost); + }); + } + + public static void feedScholixToSOLRIndex(final SparkSession spark, final String inputPath, final String collection, + Integer batchSize, final String zkHost) { + final JavaRDD docs = spark + .read() + .text(inputPath) + .as(Encoders.STRING()) + .map(new ScholixToSolr(), Encoders.kryo(SolrInputDocument.class)) + .toJavaRDD(); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json new file mode 100644 index 000000000..80987de23 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName":"c", + "paramLongName":"cluster", + "paramDescription":"should be cluster1 or cluster2", + "paramRequired":true + }, + { + "paramName":"is", + "paramLongName":"isLookupUrl", + "paramDescription":"the Information Service LookUp URL", + "paramRequired":true + }, + { + "paramName":"ip", + "paramLongName":"inputPath", + "paramDescription":"the source input path", + "paramRequired":true + }, + { + "paramName":"b", + "paramLongName":"batchSize", + "paramDescription":"the batch size param", + "paramRequired":false + }, + { + "paramName":"f", + "paramLongName":"format", + "paramDescription":"index metadata format name", + "paramRequired":true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/solr/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/solr/provision/oozie_app/config-default.xml new file mode 100644 index 000000000..7c1a43e51 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/solr/provision/oozie_app/config-default.xml @@ -0,0 +1,14 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/solr/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/solr/provision/oozie_app/workflow.xml new file mode 100644 index 000000000..2d46f9f34 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/solr/provision/oozie_app/workflow.xml @@ -0,0 +1,111 @@ + + + + sourcePath + the sourcePath of the json RDDs + + + isLookupUrl + URL for the isLookup service + + + solrDeletionQuery + *:* + query used in the deleted by query operation + + + format + metadata format name (SMF) + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.oa.provision.SolrAdminApplication + --isLookupUrl${isLookupUrl} + --format${format} + --actionDELETE_BY_QUERY + --query${solrDeletionQuery} + --committrue + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.oa.provision.SolrAdminApplication + --isLookupUrl${isLookupUrl} + --format${format} + --actionCREATE + + + + + + + + + yarn + cluster + Index summary + eu.dnetlib.dhp.sx.provision.SparkIndexCollectionOnSOLR + dhp-graph-provision-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --clusteryarn + --isLookupUrl${isLookupUrl} + --inputPath${sourcePath} + --format${format} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.oa.provision.SolrAdminApplication + --isLookupUrl${isLookupUrl} + --format${format} + --actionCOMMIT + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java index 3e8a35fe1..bcd6d78e8 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java @@ -22,7 +22,7 @@ class SolrAdminApplicationTest extends SolrTest { SolrAdminApplication admin = new SolrAdminApplication(miniCluster.getSolrClient().getZkHost()); UpdateResponse rsp = (UpdateResponse) admin - .execute(SolrAdminApplication.Action.DELETE_BY_QUERY, DEFAULT_COLLECTION, "*:*", false); + .execute(SolrAdminApplication.Action.DELETE_BY_QUERY, DEFAULT_COLLECTION, "*:*", false, null); assertEquals(0, rsp.getStatus()); }