- Implemented oozie workflows and Java Classes to feed into solr index

This commit is contained in:
Sandro La Bruzzo 2022-10-11 10:58:17 +02:00
parent 7784b3d9c4
commit 4b8739e45b
9 changed files with 293 additions and 12 deletions

View File

@ -142,6 +142,21 @@ object DataciteToOAFTransformation {
}
}
/***
* Use the vocabulary dnet:publication_resource to find a synonym to one of these terms and get the instance.type.
* Using the dnet:result_typologies vocabulary, we look up the instance.type synonym
* to generate one of the following main entities:
* - publication
* - dataset
* - software
* otherresearchproduct
* @param resourceType
* @param resourceTypeGeneral
* @param schemaOrg
* @param vocabularies
* @return
*/
def getTypeQualifier(
resourceType: String,
resourceTypeGeneral: String,

View File

@ -23,7 +23,7 @@ public class SolrAdminApplication implements Closeable {
private static final Logger log = LoggerFactory.getLogger(SolrAdminApplication.class);
enum Action {
DELETE_BY_QUERY, COMMIT
DELETE_BY_QUERY, COMMIT, CREATE
}
private final CloudSolrClient solrClient;
@ -56,6 +56,8 @@ public class SolrAdminApplication implements Closeable {
final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
final String fields = isLookup.getLayoutSource(format);
final String zkHost = isLookup.getZkHost();
log.info("zkHost: {}", zkHost);
@ -63,7 +65,7 @@ public class SolrAdminApplication implements Closeable {
log.info("collection: {}", collection);
try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) {
app.execute(action, collection, query, commit);
app.execute(action, collection, query, commit, fields);
}
}
@ -73,10 +75,10 @@ public class SolrAdminApplication implements Closeable {
}
public SolrResponse commit(String collection) throws IOException, SolrServerException {
return execute(Action.COMMIT, collection, null, true);
return execute(Action.COMMIT, collection, null, true, null);
}
public SolrResponse execute(Action action, String collection, String query, boolean commit)
public SolrResponse execute(Action action, String collection, String query, boolean commit, final String fields)
throws IOException, SolrServerException {
switch (action) {
@ -88,6 +90,11 @@ public class SolrAdminApplication implements Closeable {
return rsp;
case COMMIT:
return solrClient.commit(collection);
case CREATE:
SolrUtil
.uploadZookeperConfig(this.solrClient.getZkStateReader().getZkClient(), collection, true, fields);
SolrUtil.createCollection(this.solrClient, collection, 48, 1, 12, collection);
default:
throw new IllegalArgumentException("action not managed: " + action);
}

View File

@ -20,6 +20,7 @@ import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.SolrZkClient;
@ -179,7 +180,7 @@ public class SolrUtil {
}
public static NamedList<Object> createCollection(CloudSolrClient client, String name, int numShards,
int replicationFactor, int maxShardsPerNode, String configName) throws Exception {
int replicationFactor, int maxShardsPerNode, String configName) throws SolrServerException, IOException {
ModifiableSolrParams modParams = new ModifiableSolrParams();
modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name());
modParams.set("name", name);

View File

@ -2,26 +2,26 @@
package eu.dnetlib.dhp.oa.provision.scholix;
import java.io.IOException;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.api.java.function.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.schema.sx.scholix.*;
public class ScholixToSolr {
public class ScholixToSolr implements MapFunction<String, SolrInputDocument> {
final static ObjectMapper MAPPER = new ObjectMapper();
public static SolrInputDocument toSolrDocument(final String json) {
public static SerializableSolrInputDocument toSolrDocument(final String json) {
try {
final Scholix input = MAPPER.readValue(json, Scholix.class);
final SolrInputDocument output = new SolrInputDocument();
final SerializableSolrInputDocument output = new SerializableSolrInputDocument();
fillEntityField(output, input.getSource(), "source");
fillEntityField(output, input.getTarget(), "target");
@ -66,7 +66,7 @@ public class ScholixToSolr {
}
}
private static void fillEntityField(final SolrInputDocument document, final ScholixResource resource,
private static void fillEntityField(final SerializableSolrInputDocument document, final ScholixResource resource,
final String prefix) {
document.addField(prefix + "_identifier", resource.getDnetIdentifier());
@ -114,4 +114,8 @@ public class ScholixToSolr {
}
@Override
public SerializableSolrInputDocument call(String s) throws Exception {
return toSolrDocument(s);
}
}

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.sx.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
import eu.dnetlib.dhp.oa.provision.scholix.ScholixToSolr;
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
public class SparkIndexCollectionOnSOLR {
private static final Integer DEFAULT_BATCH_SIZE = 1000;
// LOGGER initialized
private static final Logger log = LoggerFactory.getLogger(SparkIndexCollectionOnSOLR.class);
public static void main(String[] args) throws IOException {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
SparkIndexCollectionOnSOLR.class
.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json"))));
final String cluster = parser.get("cluster");
log.info("Cluster is {}", cluster);
final String format = parser.get("format");
log.info("Index format name is {}", format);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl is {}", isLookupUrl);
final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final Integer batchSize = Optional
.ofNullable(parser.get("batchSize"))
.map(Integer::valueOf)
.orElse(DEFAULT_BATCH_SIZE);
log.info("batchSize: {}", batchSize);
final SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class[] {
SerializableSolrInputDocument.class
});
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
final ISLookupClient isLookupClient = new ISLookupClient(
ISLookupClientFactory.getLookUpService(isLookupUrl));
final String zkHost = isLookupClient.getZkHost();
log.info("zkHost: {}", zkHost);
feedScholixToSOLRIndex(spark, inputPath, format, batchSize, zkHost);
});
}
public static void feedScholixToSOLRIndex(final SparkSession spark, final String inputPath, final String collection,
Integer batchSize, final String zkHost) {
final JavaRDD<SolrInputDocument> docs = spark
.read()
.text(inputPath)
.as(Encoders.STRING())
.map(new ScholixToSolr(), Encoders.kryo(SolrInputDocument.class))
.toJavaRDD();
SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd());
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName":"c",
"paramLongName":"cluster",
"paramDescription":"should be cluster1 or cluster2",
"paramRequired":true
},
{
"paramName":"is",
"paramLongName":"isLookupUrl",
"paramDescription":"the Information Service LookUp URL",
"paramRequired":true
},
{
"paramName":"ip",
"paramLongName":"inputPath",
"paramDescription":"the source input path",
"paramRequired":true
},
{
"paramName":"b",
"paramLongName":"batchSize",
"paramDescription":"the batch size param",
"paramRequired":false
},
{
"paramName":"f",
"paramLongName":"format",
"paramDescription":"index metadata format name",
"paramRequired":true
}
]

View File

@ -0,0 +1,14 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,111 @@
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the sourcePath of the json RDDs</description>
</property>
<property>
<name>isLookupUrl</name>
<description>URL for the isLookup service</description>
</property>
<property>
<name>solrDeletionQuery</name>
<value>*:*</value>
<description>query used in the deleted by query operation</description>
</property>
<property>
<name>format</name>
<description>metadata format name (SMF)</description>
</property>
</parameters>
<start to="create_solr_index"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="drop_solr_collection">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--action</arg><arg>DELETE_BY_QUERY</arg>
<arg>--query</arg><arg>${solrDeletionQuery}</arg>
<arg>--commit</arg><arg>true</arg>
</java>
<ok to="create_solr_index"/>
<error to="Kill"/>
</action>
<action name="create_solr_index">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--action</arg><arg>CREATE</arg>
</java>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Index summary</name>
<class>eu.dnetlib.dhp.sx.provision.SparkIndexCollectionOnSOLR</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--cluster</arg><arg>yarn</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--inputPath</arg><arg>${sourcePath}</arg>
<arg>--format</arg><arg>${format}</arg>
</spark>
<ok to="commit_solr_collection"/>
<error to="Kill"/>
</action>
<action name="commit_solr_collection">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--action</arg><arg>COMMIT</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -22,7 +22,7 @@ class SolrAdminApplicationTest extends SolrTest {
SolrAdminApplication admin = new SolrAdminApplication(miniCluster.getSolrClient().getZkHost());
UpdateResponse rsp = (UpdateResponse) admin
.execute(SolrAdminApplication.Action.DELETE_BY_QUERY, DEFAULT_COLLECTION, "*:*", false);
.execute(SolrAdminApplication.Action.DELETE_BY_QUERY, DEFAULT_COLLECTION, "*:*", false, null);
assertEquals(0, rsp.getStatus());
}