added module for indexing scholexplorer

This commit is contained in:
Sandro La Bruzzo 2024-06-18 17:06:01 +02:00
parent 844a31f7a6
commit 2a7e5de094
17 changed files with 661 additions and 9 deletions

View File

@ -14,7 +14,7 @@
</property>
</parameters>
<start to="DropAndCreateIndex"/>
<start to="indexSummary"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -42,7 +42,7 @@
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.dynamicAllocation.maxExecutors="16"
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -50,10 +50,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/summaries_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/summary_json</arg>
<arg>--index</arg><arg>summary</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
<arg>--cluster</arg><arg>cluster1</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
@ -68,7 +68,7 @@
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.dynamicAllocation.maxExecutors="48"
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -76,10 +76,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/scholix</arg>
<arg>--index</arg><arg>scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
<arg>--cluster</arg><arg>cluster1</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-scholix-provision</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${dhp.jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.lmax</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<artifactId>ant</artifactId>
<groupId>org.apache.ant</groupId>
</exclusion>
<exclusion>
<artifactId>antlr4-runtime</artifactId>
<groupId>org.antlr</groupId>
</exclusion>
<exclusion>
<artifactId>woodstox-core</artifactId>
<groupId>com.fasterxml.woodstox</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>1.2.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,17 @@
package eu.dnetlib.sx.index;
import org.elasticsearch.action.index.IndexRequest;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
public abstract class RequestManager {
String extractIdentifier(final String json, final String jpath) {
DocumentContext jsonContext = JsonPath.parse(json);
return jsonContext.read(jpath);
}
public abstract IndexRequest createRequest(final String line, final String indexName);
}

View File

@ -0,0 +1,13 @@
package eu.dnetlib.sx.index;
public class RequestManagerFactory {
public static RequestManager fromType(final String type) {
if ("scholix".equalsIgnoreCase(type))
return new ScholixRequestManager();
if ("summary".equalsIgnoreCase(type))
return new SummaryRequestManager();
throw new IllegalArgumentException("unexpected type");
}
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.sx.index;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
public class ScholixRequestManager extends RequestManager {
final static String identifierPath = "$.identifier";
@Override
public IndexRequest createRequest(final String line, final String indexName) {
return new IndexRequest()
.index(indexName)
.id(extractIdentifier(line, identifierPath))
.source(line, XContentType.JSON);
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.sx.index;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SummaryRequestManager extends RequestManager {
final static String identifierPath = "$.id";
final static ObjectMapper mapper = new ObjectMapper();
private String constructSource(String line) {
Map<String, Object> params = new HashMap<>();
params.put("body", line);
try {
return mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public IndexRequest createRequest(String line, String indexName) {
return new IndexRequest()
.index(indexName)
.id(extractIdentifier(line, identifierPath))
.source(constructSource(line), XContentType.JSON);
}
}

View File

@ -0,0 +1,109 @@
package eu.dnetlib.sx.index.feeder;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.sx.index.RequestManager;
public class FeedWorker extends Thread {
private final FileSystem fileSystem;
public static String JOB_COMPLETE = "JOB_COMPLETE";
private final BlockingQueue<String> queue;
private final Logger log = LoggerFactory.getLogger(getClass().getName());
private boolean hasComplete = false;
private final String index;
private final RequestManager requestCreator;
private final RestHighLevelClient client;
public FeedWorker(
FileSystem fileSystem, BlockingQueue<String> queue,
RequestManager requestCreator, final String host, final String index) {
this.fileSystem = fileSystem;
this.queue = queue;
this.index = index;
this.requestCreator = requestCreator;
this.client = createRESTClient(host);
}
private RestHighLevelClient createRESTClient(String host) {
System.out.println("Creating client with host = " + host);
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, 9200, "http")));
}
@Override
public void run() {
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
while (!hasComplete) {
try {
final String nextItem = queue.take();
if (JOB_COMPLETE.equalsIgnoreCase(nextItem)) {
hasComplete = true;
} else {
System.out.println("Parsing " + nextItem + "\n");
final Path currentPath = new Path(nextItem);
final CompressionCodec codec = factory.getCodec(currentPath);
InputStream gzipInputStream = codec.createInputStream(fileSystem.open(currentPath));
Reader decoder = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(decoder);
doIndexChunk(reader);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
private BulkRequest createRequest() {
final BulkRequest request = new BulkRequest();
request.timeout("2m");
return request;
}
private void doIndexChunk(final BufferedReader reader) throws Exception {
String next;
BulkRequest request = createRequest();
int i = 0;
while ((next = reader.readLine()) != null) {
request.add(this.requestCreator.createRequest(next, index));
if (i++ % 10000 == 0) {
client.bulk(request, RequestOptions.DEFAULT);
request = createRequest();
System.out.printf("Bulk-> %d items \n", i);
log.debug("Bulk-> {} items ", i);
}
}
client.bulk(request, RequestOptions.DEFAULT);
System.out.printf(" Final Bulk-> %d items \n", i);
log.debug("Final Bulk-> {} items ", i);
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.sx.index.feeder;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.sx.index.RequestManagerFactory;
public class IndexFeed {
private final FileSystem fileSystem;
private final static Logger log = LoggerFactory.getLogger(IndexFeed.class);
public IndexFeed(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
IndexFeed.class
.getResourceAsStream(
"/eu/dnetlib/sx/provision/feed_index_params.json"))));
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI is {}", hdfsuri);
final String sourcePath = argumentParser.get("sourcePath");
log.info("sourcePath is {}", sourcePath);
final String cluster = argumentParser.get("cluster");
log.info("cluster is {}", cluster);
final String index = argumentParser.get("index");
log.info("index is {}", index);
final String clusterJson = IOUtils
.toString(
Objects
.requireNonNull(
IndexFeed.class.getResourceAsStream("/eu/dnetlib/sx/provision/cluster.json")));
@SuppressWarnings("unchecked")
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
if (!clusterMap.containsKey(cluster)) {
throw new RuntimeException(
String.format("Cluster %s not found, expected values cluster1, cluster2", cluster));
}
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
new IndexFeed(fileSystem).run(sourcePath, clusterMap.get(cluster), index);
}
public void run(final String sourcePath, final String host, final String index) throws Exception {
RemoteIterator<LocatedFileStatus> ls = fileSystem.listFiles(new Path(sourcePath), false);
final List<FeedWorker> workers = new ArrayList<>();
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3000);
for (String currentHost : host.split(",")) {
workers
.add(
new FeedWorker(fileSystem, queue, RequestManagerFactory.fromType(index), currentHost.trim(),
index));
}
workers.forEach(Thread::start);
while (ls.hasNext()) {
LocatedFileStatus current = ls.next();
if (current.getPath().getName().endsWith(".gz")) {
queue.put(current.getPath().toString());
}
}
for (FeedWorker worker : workers) {
try {
queue.put(FeedWorker.JOB_COMPLETE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
worker.join();
}
}
}

View File

@ -0,0 +1,4 @@
{
"cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
"cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
}

View File

@ -0,0 +1,27 @@
[
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the name node param",
"paramRequired": true
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "The source Path",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "cluster",
"paramDescription": "The cluster name",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "index",
"paramDescription": "The index name",
"paramRequired": true
}
]

View File

@ -0,0 +1,14 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,62 @@
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the sourcePath of the json RDDs</description>
</property>
<property>
<name>esCluster</name>
<description>the Index cluster</description>
</property>
</parameters>
<start to="indexSummary"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
<arg>-namenode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/summary_json</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
<arg>--index</arg><arg>summary</arg>
</java>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
<arg>-namenode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/scholix</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
<arg>--index</arg><arg>scholix</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,63 @@
{
"settings": {
"index": {
"number_of_shards": "140",
"number_of_replicas": "0",
"refresh_interval": "-1",
"translog.flush_threshold_size": "2048MB",
"codec": "best_compression"
}
},
"mappings": {
"properties": {
"identifier": {
"type": "keyword"
},
"linkProviders": {
"type": "text"
},
"publicationDate": {
"type": "date"
},
"relationType": {
"type": "keyword"
},
"sourceId": {
"type": "keyword"
},
"sourcePid": {
"type": "keyword"
},
"sourcePidType": {
"type": "keyword"
},
"sourcePublisher": {
"type": "text"
},
"sourceSubType": {
"type": "keyword"
},
"sourceType": {
"type": "keyword"
},
"targetId": {
"type": "keyword"
},
"targetPid": {
"type": "keyword"
},
"targetPidType": {
"type": "keyword"
},
"targetPublisher": {
"type": "text"
},
"targetSubType": {
"type": "keyword"
},
"targetType": {
"type": "keyword"
}
}
}
}

View File

@ -0,0 +1,19 @@
{
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "0",
"refresh_interval": "-1",
"translog.flush_threshold_size": "2048MB",
"codec": "best_compression"
}
},
"mappings": {
"properties": {
"body": {
"type": "text",
"index": false
}
}
}
}

View File

@ -0,0 +1,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;
import eu.dnetlib.sx.index.feeder.IndexFeed;
public class IndexPathTest {
@Test
public void testIndexing() throws Exception {
Configuration conf = new Configuration();
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(conf);
new IndexFeed(fs).run("file:///Users/sandro/Downloads/scholix/summary_js", "localhost", "summary");
}
}

View File

@ -0,0 +1,11 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.logger.org = ERROR
log4j.logger.eu.dnetlib = DEBUG
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@ -43,6 +43,7 @@
<module>dhp-doiboost</module>
<module>dhp-impact-indicators</module>
<module>dhp-swh</module>
<module>dhp-scholix-provision</module>
</modules>
<pluginRepositories>