diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index fa1964773..8194d4d01 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -48,6 +48,80 @@
spark-solr
+
+ org.apache.solr
+ solr-test-framework
+ test
+
+
+ com.carrotsearch
+ *
+
+
+ com.carrotsearch.randomizedtesting
+ *
+
+
+
+ com.fasterxml.jackson.core
+ *
+
+
+ com.fasterxml.jackson.dataformat
+ *
+
+
+ org.codehaus.jackson
+ *
+
+
+ org.codehaus.janino
+ *
+
+
+ org.codehaus.woodstox
+ *
+
+
+
+
+
+ com.github.ben-manes.caffeine
+ *
+
+
+ com.google.guava
+ *
+
+
+ com.google.protobuf
+ *
+
+
+ com.lmax
+ *
+
+
+ com.tdunning
+ *
+
+
+ org.apache.hadoop
+ *
+
+
+
+
+
+
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ test
+
+
org.apache.httpcomponents
httpclient
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java
new file mode 100644
index 000000000..8c8947298
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java
@@ -0,0 +1,101 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.provision.utils.ZkServers;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+public class SolrAdminApplication extends SolrApplication implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(SolrAdminApplication.class);
+
+ enum Action {
+ DELETE_BY_QUERY, COMMIT
+ }
+
+ private CloudSolrClient solrClient;
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SolrAdminApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json")));
+ parser.parseArgument(args);
+
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ final String format = parser.get("format");
+ log.info("format: {}", format);
+
+ final Action action = Action.valueOf(parser.get("action"));
+ log.info("action: {}", action);
+
+ final String query = parser.get("query");
+ log.info("query: {}", query);
+
+ final boolean commit = Optional
+ .ofNullable(parser.get("commit"))
+ .map(Boolean::valueOf)
+ .orElse(false);
+ log.info("commit: {}", commit);
+
+ final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
+
+ final String zkHost = getZkHost(isLookup);
+ log.info("zkHost: {}", zkHost);
+
+ final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
+ log.info("collection: {}", collection);
+
+ try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) {
+ app.execute(action, collection, query, commit);
+ }
+ }
+
+ public SolrAdminApplication(String zkHost) {
+ final ZkServers zk = ZkServers.newInstance(zkHost);
+ this.solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()).build();
+ }
+
+ public SolrResponse commit(String collection) throws IOException, SolrServerException {
+ return execute(Action.COMMIT, collection, null, true);
+ }
+
+ public SolrResponse execute(Action action, String collection, String query, boolean commit)
+ throws IOException, SolrServerException {
+ switch (action) {
+
+ case DELETE_BY_QUERY:
+ UpdateResponse rsp = solrClient.deleteByQuery(collection, query);
+ if (commit) {
+ solrClient.commit(collection);
+ }
+ return rsp;
+ case COMMIT:
+ return solrClient.commit(collection);
+ default:
+ throw new IllegalArgumentException("action not managed: " + action.toString());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ solrClient.close();
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java
new file mode 100644
index 000000000..a824c6c2c
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java
@@ -0,0 +1,40 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+public abstract class SolrApplication {
+
+ private static final Logger log = LoggerFactory.getLogger(SolrApplication.class);
+
+ protected static final String LAYOUT = "index";
+ protected static final String INTERPRETATION = "openaire";
+ protected static final String SEPARATOR = "-";
+ protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
+
+ /**
+ * Method retrieves from the information system the zookeeper quorum of the Solr server
+ *
+ * @param isLookup
+ * @return the zookeeper quorum of the Solr server
+ * @throws ISLookUpException
+ */
+ protected static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
+ return doLookup(
+ isLookup,
+ "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
+ }
+
+ protected static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
+ log.info(String.format("running xquery: %s", xquery));
+ final String res = isLookup.getResourceProfileByQuery(xquery);
+ log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
+ return res;
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java
index ede7aa7b4..5b5596162 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java
@@ -35,17 +35,12 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-public class XmlIndexingJob {
+public class XmlIndexingJob extends SolrApplication {
private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
- private static final String LAYOUT = "index";
- private static final String INTERPRETATION = "openaire";
- private static final String SEPARATOR = "-";
- public static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
-
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -208,23 +203,4 @@ public class XmlIndexingJob {
format));
}
- /**
- * Method retrieves from the information system the zookeeper quorum of the Solr server
- *
- * @param isLookup
- * @return the zookeeper quorum of the Solr server
- * @throws ISLookUpException
- */
- private static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
- return doLookup(
- isLookup,
- "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
- }
-
- private static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
- log.info(String.format("running xquery: %s", xquery));
- final String res = isLookup.getResourceProfileByQuery(xquery);
- log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
- return res;
- }
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ZkServers.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ZkServers.java
new file mode 100644
index 000000000..6cec3ed53
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ZkServers.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.dhp.oa.provision.utils;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+public class ZkServers {
+
+ private static final Log log = LogFactory.getLog(ZkServers.class);
+ public static final String SEPARATOR = "/";
+
+ private List hosts;
+
+ private Optional chroot;
+
+ public static ZkServers newInstance(final String zkUrl) {
+
+ // quorum0:2182,quorum1:2182,quorum2:2182,quorum3:2182,quorum4:2182/solr-dev-openaire
+ String urls = zkUrl;
+ final Optional chRoot = Optional.of(SEPARATOR + StringUtils.substringAfterLast(zkUrl, SEPARATOR));
+ if (chRoot.isPresent() && StringUtils.isNotBlank(chRoot.get())) {
+ log.debug(String.format("found zk chroot %s", chRoot));
+ urls = zkUrl.replace(chRoot.get(), "");
+ }
+
+ final List urlList = Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(urls));
+ log.debug(String.format("zk urls %s", zkUrl));
+
+ return new ZkServers(urlList, chRoot);
+ }
+
+ public ZkServers(List hosts, Optional chroot) {
+ this.hosts = hosts;
+ this.chroot = chroot;
+ }
+
+ public List getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(List hosts) {
+ this.hosts = hosts;
+ }
+
+ public Optional getChroot() {
+ return chroot;
+ }
+
+ public void setChroot(Optional chroot) {
+ this.chroot = chroot;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json
new file mode 100644
index 000000000..23eca2f7b
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "isu",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "the URL to the ISLookUp Service",
+ "paramRequired": true
+ },
+ {
+ "paramName": "f",
+ "paramLongName": "format",
+ "paramDescription": "metadata format profile name",
+ "paramRequired": true
+ },
+ {
+ "paramName": "a",
+ "paramLongName": "action",
+ "paramDescription": "the action to be performed by the application",
+ "paramRequired": true
+ },
+ {
+ "paramName": "q",
+ "paramLongName": "query",
+ "paramDescription": "the query",
+ "paramRequired": false
+ },
+ {
+ "paramName": "c",
+ "paramLongName": "commit",
+ "paramDescription": "should the action be followed by a commit?",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index 91ced378c..192a6f59b 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -37,7 +37,11 @@
batchSize
number of records to be included in each indexing request
-
+
+ solrDeletionQuery
+ *:*
+ query used in the deleted by query operation
+
sparkDriverMemoryForJoining
memory for driver process
@@ -109,6 +113,7 @@
${wf:conf('resumeFrom') eq 'fork_join_related_entities'}
${wf:conf('resumeFrom') eq 'fork_join_all_entities'}
${wf:conf('resumeFrom') eq 'convert_to_xml'}
+ ${wf:conf('resumeFrom') eq 'drop_solr_collection'}
${wf:conf('resumeFrom') eq 'to_solr_index'}
@@ -584,12 +589,25 @@
- ${wf:conf('shouldIndex') eq 'true'}
+ ${wf:conf('shouldIndex') eq 'true'}
${wf:conf('shouldIndex') eq 'false'}
-
+
+
+
+ eu.dnetlib.dhp.oa.provision.SolrAdminApplication
+ --isLookupUrl${isLookupUrl}
+ --format${format}
+ --actionDELETE_BY_QUERY
+ --query${solrDeletionQuery}
+ --committrue
+
+
+
+
+
yarn
@@ -615,6 +633,17 @@
--format${format}
--batchSize${batchSize}
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.provision.SolrAdminApplication
+ --isLookupUrl${isLookupUrl}
+ --format${format}
+ --actionCOMMIT
+
diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java
index 528532edd..6818cf6a5 100644
--- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java
+++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
@@ -20,8 +21,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
@@ -31,7 +30,8 @@ public class PrepareRelationsJobTest {
public static final String SUBRELTYPE = "subRelType";
public static final String OUTCOME = "outcome";
- public static final String SUPPLEMENT = "supplement";
+ public static final String PARTICIPATION = "participation";
+ public static final String AFFILIATION = "affiliation";
private static SparkSession spark;
@@ -64,7 +64,7 @@ public class PrepareRelationsJobTest {
@Test
public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception {
- final int maxRelations = 10;
+ final int maxRelations = 20;
PrepareRelationsJob
.main(
new String[] {
@@ -73,7 +73,8 @@ public class PrepareRelationsJobTest {
"-outputPath", testPath.toString(),
"-relPartitions", "10",
"-relationFilter", "asd",
- "-maxRelations", String.valueOf(maxRelations)
+ "-sourceMaxRelations", String.valueOf(maxRelations),
+ "-targetMaxRelations", String.valueOf(maxRelations * 100)
});
Dataset out = spark
@@ -82,19 +83,31 @@ public class PrepareRelationsJobTest {
.as(Encoders.bean(Relation.class))
.cache();
- Assertions.assertEquals(10, out.count());
+ Assertions.assertEquals(maxRelations, out.count());
Dataset freq = out
.toDF()
.cube(SUBRELTYPE)
.count()
.filter((FilterFunction) value -> !value.isNullAt(0));
- long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count");
- long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");
- Assertions.assertTrue(outcome > supplement);
+ log.info(freq.collectAsList().toString());
+
+ long outcome = getRows(freq, OUTCOME).get(0).getAs("count");
+ long participation = getRows(freq, PARTICIPATION).get(0).getAs("count");
+ long affiliation = getRows(freq, AFFILIATION).get(0).getAs("count");
+
+ Assertions.assertTrue(participation == outcome);
+ Assertions.assertTrue(outcome > affiliation);
+ Assertions.assertTrue(participation > affiliation);
+
Assertions.assertEquals(7, outcome);
- Assertions.assertEquals(3, supplement);
+ Assertions.assertEquals(7, participation);
+ Assertions.assertEquals(6, affiliation);
+ }
+
+ protected List getRows(Dataset freq, String col) {
+ return freq.filter(freq.col(SUBRELTYPE).equalTo(col)).collectAsList();
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java
new file mode 100644
index 000000000..cbd7b2de2
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java
@@ -0,0 +1,130 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+import java.io.File;
+import java.nio.file.Path;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.XMLResponseParser;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.Assert;
+
+public class SolrAdminApplicationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(SolrAdminApplicationTest.class);
+ public static final String DEFAULT_COLLECTION = "testCollection";
+ public static final String CONFIG_NAME = "testConfig";
+
+ private static MiniSolrCloudCluster miniCluster;
+ private static CloudSolrClient cloudSolrClient;
+
+ @TempDir
+ public static Path tempDir;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+
+ // random unassigned HTTP port
+ final int jettyPort = 0;
+
+ final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build();
+
+ // create a MiniSolrCloudCluster instance
+ miniCluster = new MiniSolrCloudCluster(2, tempDir, jettyConfig);
+
+ // Upload Solr configuration directory to ZooKeeper
+ String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig";
+ File configDir = new File(solrZKConfigDir);
+
+ miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME);
+
+ // override settings in the solrconfig include
+ System.setProperty("solr.tests.maxBufferedDocs", "100000");
+ System.setProperty("solr.tests.maxIndexingThreads", "-1");
+ System.setProperty("solr.tests.ramBufferSizeMB", "100");
+
+ // use non-test classes so RandomizedRunner isn't necessary
+ System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
+ System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory");
+
+ cloudSolrClient = miniCluster.getSolrClient();
+ cloudSolrClient.setRequestWriter(new RequestWriter());
+ cloudSolrClient.setParser(new XMLResponseParser());
+ cloudSolrClient.setDefaultCollection(DEFAULT_COLLECTION);
+ cloudSolrClient.connect();
+
+ log.info(new ConfigSetAdminRequest.List().process(cloudSolrClient).toString());
+ log.info(CollectionAdminRequest.ClusterStatus.getClusterStatus().process(cloudSolrClient).toString());
+
+ createCollection(cloudSolrClient, DEFAULT_COLLECTION, 2, 1, CONFIG_NAME);
+ }
+
+ @AfterAll
+ public static void shutDown() throws Exception {
+ miniCluster.shutdown();
+ }
+
+ protected static NamedList