fixed error on loading files on solr, in cluster is not possible to iterate files inside jar

This commit is contained in:
Sandro La Bruzzo 2022-10-18 10:45:40 +02:00
parent 818a936468
commit ffa8cdf981
7 changed files with 141 additions and 129 deletions

View File

@ -94,6 +94,7 @@ public class SolrAdminApplication implements Closeable {
SolrUtil SolrUtil
.uploadZookeperConfig(this.solrClient.getZkStateReader().getZkClient(), collection, true, fields); .uploadZookeperConfig(this.solrClient.getZkStateReader().getZkClient(), collection, true, fields);
SolrUtil.createCollection(this.solrClient, collection, 48, 1, 12, collection); SolrUtil.createCollection(this.solrClient, collection, 48, 1, 12, collection);
return null;
default: default:
throw new IllegalArgumentException("action not managed: " + action); throw new IllegalArgumentException("action not managed: " + action);

View File

@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -56,9 +57,11 @@ public class SolrUtil {
private static final char DELIMITER = '$'; private static final char DELIMITER = '$';
private static final String CONF_BASE_PATH = "/eu/dnetlib/dhp/oa/provision/conf"; public static final String CONF_BASE_PATH = "/eu/dnetlib/dhp/oa/provision/conf/";
public static final String CONF_FILE_BASE_PATH = "/eu/dnetlib/dhp/oa/provision/conf/files/"; // public static final String CONF_FILE_BASE_PATH = "/eu/dnetlib/dhp/oa/provision/conf/files/";
public static final String LIST_FILE_BASE_PATH = "/eu/dnetlib/dhp/oa/provision/conf/files/file_list";
private static final String SCHEMA_TEMPLATE_PATH = "/eu/dnetlib/dhp/oa/provision/conf/schemaTemplate.xslt"; private static final String SCHEMA_TEMPLATE_PATH = "/eu/dnetlib/dhp/oa/provision/conf/schemaTemplate.xslt";
@ -155,6 +158,7 @@ public class SolrUtil {
} }
private static String loadFileInClassPath(final String aPath) { private static String loadFileInClassPath(final String aPath) {
System.out.println("LOAD FILE FROM PATH: " + aPath);
try { try {
return IOUtils return IOUtils
.toString(Objects.requireNonNull(SolrUtil.class.getResourceAsStream(aPath)), Charset.defaultCharset()); .toString(Objects.requireNonNull(SolrUtil.class.getResourceAsStream(aPath)), Charset.defaultCharset());
@ -164,7 +168,7 @@ public class SolrUtil {
} }
public static Map<String, String> getServiceProperties() throws IOException { public static Map<String, String> getServiceProperties() throws IOException {
final String properties = loadFileInClassPath(CONF_BASE_PATH + "/service_properties.json"); final String properties = loadFileInClassPath(CONF_BASE_PATH + "service_properties.json");
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
TypeFactory typeFactory = mapper.getTypeFactory(); TypeFactory typeFactory = mapper.getTypeFactory();
MapType mapType = typeFactory.constructMapType(HashMap.class, String.class, String.class); MapType mapType = typeFactory.constructMapType(HashMap.class, String.class, String.class);
@ -173,7 +177,7 @@ public class SolrUtil {
public static String getConfig() throws Exception { public static String getConfig() throws Exception {
final Map<String, String> p = getServiceProperties(); final Map<String, String> p = getServiceProperties();
final String st = loadFileInClassPath(CONF_BASE_PATH + "/solrconfig.xml.st"); final String st = loadFileInClassPath(CONF_BASE_PATH + "solrconfig.xml.st");
final ST solrConfig = new ST(st, DELIMITER, DELIMITER); final ST solrConfig = new ST(st, DELIMITER, DELIMITER);
p.forEach(solrConfig::add); p.forEach(solrConfig::add);
return solrConfig.render(); return solrConfig.render();
@ -204,22 +208,15 @@ public class SolrUtil {
res.put("solrconfig.xml", getConfig().getBytes()); res.put("solrconfig.xml", getConfig().getBytes());
log.debug("adding solrconfig.xml to the resource map"); log.debug("adding solrconfig.xml to the resource map");
String data = IOUtils
Files .toString(Objects.requireNonNull(SolrUtil.class.getResourceAsStream(LIST_FILE_BASE_PATH)));
.list( Arrays.stream(data.split("\n")).forEach(s -> {
Paths.get(Objects.requireNonNull(SolrUtil.class.getResource(CONF_FILE_BASE_PATH)).getPath())) final String name = s.replace(CONF_BASE_PATH + "files/", "");
.map(Path::getFileName) res
.forEach(s -> { .put(
log.debug(String.format("put file from path %s", CONF_FILE_BASE_PATH + s)); name,
res Objects.requireNonNull(loadFileInClassPath(s)).getBytes(StandardCharsets.UTF_8));
.put( });
String.valueOf(s),
Objects
.requireNonNull(loadFileInClassPath(CONF_FILE_BASE_PATH + s))
.getBytes(StandardCharsets.UTF_8));
});
return res; return res;
} catch (Throwable e) { } catch (Throwable e) {
throw new Exception("failed to build configuration", e); throw new Exception("failed to build configuration", e);

View File

@ -7,6 +7,7 @@ import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.lucidworks.spark.util.SolrSupport; import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.ProvisionConstants;
import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
import eu.dnetlib.dhp.oa.provision.scholix.ScholixToSolr; import eu.dnetlib.dhp.oa.provision.scholix.ScholixToSolr;
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
@ -33,7 +35,7 @@ public class SparkIndexCollectionOnSOLR {
// LOGGER initialized // LOGGER initialized
private static final Logger log = LoggerFactory.getLogger(SparkIndexCollectionOnSOLR.class); private static final Logger log = LoggerFactory.getLogger(SparkIndexCollectionOnSOLR.class);
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException, ParseException {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
@ -42,14 +44,16 @@ public class SparkIndexCollectionOnSOLR {
SparkIndexCollectionOnSOLR.class SparkIndexCollectionOnSOLR.class
.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json")))); .getResourceAsStream("/eu/dnetlib/dhp/sx/provision/index_solr_parameters.json"))));
parser.parseArgument(args);
final String cluster = parser.get("cluster"); final String cluster = parser.get("cluster");
log.info("Cluster is {}", cluster); log.info("Cluster is {}", cluster);
final String format = parser.get("format"); final String format = parser.get("format");
log.info("Index format name is {}", format); log.info("Index format name is {}", format);
final String isLookupUrl = parser.get("isLookupUrl"); final String isLookupUrl = parser.get("isURL");
log.info("isLookupUrl is {}", isLookupUrl); log.info("isURL is {}", isLookupUrl);
final String inputPath = parser.get("inputPath"); final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
@ -75,11 +79,12 @@ public class SparkIndexCollectionOnSOLR {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
final ISLookupClient isLookupClient = new ISLookupClient( final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
ISLookupClientFactory.getLookUpService(isLookupUrl)); final String zkHost = isLookup.getZkHost();
final String zkHost = isLookupClient.getZkHost();
log.info("zkHost: {}", zkHost); log.info("zkHost: {}", zkHost);
feedScholixToSOLRIndex(spark, inputPath, format, batchSize, zkHost); final String collection = ProvisionConstants.getCollectionName(format);
log.info("collection: {}", collection);
feedScholixToSOLRIndex(spark, inputPath, collection, batchSize, zkHost);
}); });
} }

View File

@ -0,0 +1,6 @@
/eu/dnetlib/dhp/oa/provision/conf/files/currency.xml
/eu/dnetlib/dhp/oa/provision/conf/files/elevate.xml
/eu/dnetlib/dhp/oa/provision/conf/files/params.json
/eu/dnetlib/dhp/oa/provision/conf/files/protwords.txt
/eu/dnetlib/dhp/oa/provision/conf/files/stopwords.txt
/eu/dnetlib/dhp/oa/provision/conf/files/synonyms.txt

View File

@ -7,7 +7,7 @@
}, },
{ {
"paramName":"is", "paramName":"is",
"paramLongName":"isLookupUrl", "paramLongName":"isURL",
"paramDescription":"the Information Service LookUp URL", "paramDescription":"the Information Service LookUp URL",
"paramRequired":true "paramRequired":true
}, },

View File

@ -1,111 +1,113 @@
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>sourcePath</name> <name>sourcePath</name>
<description>the sourcePath of the json RDDs</description> <description>the sourcePath of the json RDDs</description>
</property> </property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>URL for the isLookup service</description> <description>URL for the isLookup service</description>
</property> </property>
<property> <property>
<name>solrDeletionQuery</name> <name>solrDeletionQuery</name>
<value>*:*</value> <value>*:*</value>
<description>query used in the deleted by query operation</description> <description>query used in the deleted by query operation</description>
</property> </property>
<property> <property>
<name>format</name> <name>format</name>
<description>metadata format name (SMF)</description> <description>metadata format name (SMF)</description>
</property> </property>
</parameters> </parameters>
<start to="create_solr_index"/> <start to="indexScholix"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="drop_solr_collection"> <action name="drop_solr_collection">
<java> <java>
<configuration> <configuration>
<property> <property>
<name>oozie.launcher.mapreduce.user.classpath.first</name> <name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value> <value>true</value>
</property> </property>
</configuration> </configuration>
<main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class> <main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg> <arg>--format</arg><arg>${format}</arg>
<arg>--action</arg><arg>DELETE_BY_QUERY</arg> <arg>--action</arg><arg>DELETE_BY_QUERY</arg>
<arg>--query</arg><arg>${solrDeletionQuery}</arg> <arg>--query</arg><arg>${solrDeletionQuery}</arg>
<arg>--commit</arg><arg>true</arg> <arg>--commit</arg><arg>true</arg>
</java> </java>
<ok to="create_solr_index"/> <ok to="create_solr_index"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="create_solr_index"> <action name="create_solr_index">
<java> <java>
<configuration> <configuration>
<property> <property>
<name>oozie.launcher.mapreduce.user.classpath.first</name> <name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value> <value>true</value>
</property> </property>
</configuration> </configuration>
<main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class> <main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg> <arg>--format</arg><arg>${format}</arg>
<arg>--action</arg><arg>CREATE</arg> <arg>--action</arg><arg>CREATE</arg>
</java> </java>
<ok to="indexScholix"/> <ok to="indexScholix"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="indexScholix"> <action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Index summary</name> <name>Index summary</name>
<class>eu.dnetlib.dhp.sx.provision.SparkIndexCollectionOnSOLR</class> <class>eu.dnetlib.dhp.sx.provision.SparkIndexCollectionOnSOLR</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors="8" --conf spark.shuffle.service.enabled=true
--driver-memory=${sparkDriverMemory} --executor-memory=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.dynamicAllocation.maxExecutors="16"
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --driver-memory=${sparkDriverMemory}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
</spark-opts> --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
<arg>--cluster</arg><arg>yarn</arg> --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> </spark-opts>
<arg>--inputPath</arg><arg>${sourcePath}</arg> <arg>--cluster</arg><arg>yarn</arg>
<arg>--format</arg><arg>${format}</arg> <arg>--isURL</arg><arg>${isLookupUrl}</arg>
<arg>--inputPath</arg><arg>${sourcePath}</arg>
<arg>--format</arg><arg>${format}</arg>
</spark> </spark>
<ok to="commit_solr_collection"/> <ok to="commit_solr_collection"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="commit_solr_collection"> <action name="commit_solr_collection">
<java> <java>
<configuration> <configuration>
<property> <property>
<name>oozie.launcher.mapreduce.user.classpath.first</name> <name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value> <value>true</value>
</property> </property>
</configuration> </configuration>
<main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class> <main-class>eu.dnetlib.dhp.oa.provision.SolrAdminApplication</main-class>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg> <arg>--format</arg><arg>${format}</arg>
<arg>--action</arg><arg>COMMIT</arg> <arg>--action</arg><arg>COMMIT</arg>
</java> </java>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -97,8 +97,9 @@ public class ScholixIndexingTest extends SolrTest {
.list( .list(
Paths Paths
.get( .get(
Objects.requireNonNull(getClass().getResource(SolrUtil.CONF_FILE_BASE_PATH)).getPath())) Objects.requireNonNull(getClass().getResource(SolrUtil.CONF_BASE_PATH + "files/")).getPath()))
.map(Path::getFileName) .map(Path::getFileName)
.filter(p -> !p.getFileName().toString().equalsIgnoreCase("file_list"))
.map(Path::toString) .map(Path::toString)
.collect(Collectors.toList()); .collect(Collectors.toList());
configurationFiles.add("schema.xml"); configurationFiles.add("schema.xml");