Compare commits

...

3 Commits

21 changed files with 819 additions and 18 deletions

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty} import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
import eu.dnetlib.dhp.schema.sx.scholix.{ import eu.dnetlib.dhp.schema.sx.scholix.{
Scholix, Scholix,
ScholixCollectedFrom, ScholixCollectedFrom,
@ -10,6 +11,7 @@ import eu.dnetlib.dhp.schema.sx.scholix.{
ScholixRelationship, ScholixRelationship,
ScholixResource ScholixResource
} }
import org.apache.logging.log4j.core.appender.ConsoleAppender.Target
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
@ -26,6 +28,16 @@ case class RelationInfo(
) {} ) {}
case class RelKeyValue(key: String, value: String) {} case class RelKeyValue(key: String, value: String) {}
case class SummaryResource(
id: String,
typology: String,
subType: String,
pids: List[String],
pidTypes: List[String],
publishers: List[String],
date: String
) {}
object ScholexplorerUtils { object ScholexplorerUtils {
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier" val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
@ -86,6 +98,99 @@ object ScholexplorerUtils {
.toList .toList
} }
def generateSummaryResource(input: ScholixResource): SummaryResource = {
val distinctIds = if (input.getIdentifier != null) {
input.getIdentifier.asScala.map(i => i.getIdentifier).distinct.toList
} else List()
val distinctTypes = if (input.getIdentifier != null) {
input.getIdentifier.asScala.map(i => i.getSchema).distinct.toList
} else List()
val distinctPublishers = if (input.getPublisher != null) {
input.getPublisher.asScala.map(i => i.getName).distinct.sorted.take(5).toList
} else List()
SummaryResource(
id = input.getDnetIdentifier,
typology = input.getObjectType,
subType = input.getObjectSubType,
pids = distinctIds,
pidTypes = distinctTypes,
publishers = distinctPublishers,
date = input.getPublicationDate
)
}
def generateScholixFlat(relation: RelationInfo, summary: SummaryResource, updateSource: Boolean): ScholixFlat = {
val scholix = new ScholixFlat
scholix.setIdentifier(relation.id)
if (relation.collectedfrom != null && relation.collectedfrom.nonEmpty)
scholix.setLinkProviders(
relation.collectedfrom
.map(cf => {
cf.value
})
.distinct
.sorted
.take(5)
.toList
.asJava
)
else {
scholix.setLinkProviders(List("OpenAIRE").asJava)
}
val semanticRelation = relations.getOrElse(relation.relclass.toLowerCase, null)
if (semanticRelation == null)
return null
scholix.setRelationType(semanticRelation.original)
scholix.setPublicationDate(summary.date)
if (updateSource) {
if (summary.pids.isEmpty)
return null
scholix.setSourceId(summary.id)
scholix.setSourcePid(summary.pids.asJava)
scholix.setSourcePidType(summary.pidTypes.asJava)
scholix.setSourceType(summary.typology)
scholix.setSourceSubType(summary.subType)
if (summary.publishers.nonEmpty) {
scholix.setSourcePublisher(summary.publishers.asJava)
}
} else {
if (summary.pids.isEmpty)
return null
scholix.setTargetId(summary.id)
scholix.setTargetPid(summary.pids.asJava)
scholix.setTargetPidType(summary.pidTypes.asJava)
scholix.setTargetType(summary.typology)
scholix.setTargetSubType(summary.subType)
if (summary.publishers.nonEmpty) {
scholix.setTargetPublisher(summary.publishers.asJava)
}
}
scholix
}
def mergeScholixFlat(source: ScholixFlat, target: ScholixFlat): ScholixFlat = {
if (source.getPublicationDate == null) {
source.setPublicationDate(target.getPublicationDate)
}
source.setTargetId(target.getTargetId)
source.setTargetPid(target.getTargetPid)
source.setTargetPidType(target.getTargetPidType)
source.setTargetType(target.getTargetType)
source.setTargetSubType(target.getTargetSubType)
if (source.getLinkProviders != null)
source.setTargetPublisher(target.getTargetPublisher)
else if (source.getLinkProviders != null && target.getLinkProviders != null) {
source.setLinkProviders(
source.getLinkProviders.asScala.union(target.getLinkProviders.asScala).sorted.distinct.take(5).asJava
)
}
source
}
def generateScholixResourceFromResult(result: Result): ScholixResource = { def generateScholixResourceFromResult(result: Result): ScholixResource = {
if (result.getInstance() == null || result.getInstance().size() == 0) if (result.getInstance() == null || result.getInstance().size() == 0)

View File

@ -10,6 +10,7 @@ import eu.dnetlib.dhp.schema.oaf.{
Software, Software,
Dataset => OafDataset Dataset => OafDataset
} }
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
import org.apache.spark.sql.functions.{col, concat, expr, first, md5} import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@ -29,7 +30,7 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
log.info("targetPath: {}", targetPath) log.info("targetPath: {}", targetPath)
generateBidirectionalRelations(sourcePath, targetPath, spark) generateBidirectionalRelations(sourcePath, targetPath, spark)
generateScholixResource(sourcePath, targetPath, spark) generateScholixResource(sourcePath, targetPath, spark)
generateScholix(targetPath, spark) generateFlatScholix(targetPath, spark)
} }
def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = { def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = {
@ -101,6 +102,47 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
} }
def generateFlatScholix(outputPath: String, spark: SparkSession): Unit = {
import spark.implicits._
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
implicit val scholixEncoder: Encoder[ScholixFlat] = Encoders.bean(classOf[ScholixFlat])
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
resource
.map(s => ScholexplorerUtils.generateSummaryResource(s))
.write
.mode(SaveMode.Overwrite)
.save(s"$outputPath/summary")
val summaries = spark.read.load(s"$outputPath/summary").as[SummaryResource]
relations
.joinWith(summaries, relations("source") === summaries("id"))
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, true))
.write
.mode(SaveMode.Overwrite)
.save(s"$outputPath/scholix_source")
val scholix_source = spark.read.load(s"$outputPath/scholix_source").as[ScholixFlat]
relations
.joinWith(summaries, relations("target") === summaries("id"))
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, false))
.write
.mode(SaveMode.Overwrite)
.save(s"$outputPath/scholix_target")
val scholix_target = spark.read.load(s"$outputPath/scholix_target").as[ScholixFlat]
scholix_source
.joinWith(scholix_target, scholix_source("identifier") === scholix_target("identifier"), "inner")
.map(s => ScholexplorerUtils.mergeScholixFlat(s._1, s._2))
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(s"$outputPath/scholix")
}
def generateScholix(outputPath: String, spark: SparkSession): Unit = { def generateScholix(outputPath: String, spark: SparkSession): Unit = {
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource]) implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix]) implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])

View File

@ -14,13 +14,15 @@ class ScholixGenerationTest {
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
val app = new SparkCreateScholexplorerDump(null, null, null) val app = new SparkCreateScholexplorerDump(null, null, null)
// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
// app.generateBidirectionalRelations( val basePath = "/Users/sandro/Downloads"
// "/home/sandro/Downloads/scholix_sample/", app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark)
// "/home/sandro/Downloads/scholix/", app.generateBidirectionalRelations(
// spark s"$basePath/scholix_sample/",
// ) s"$basePath/scholix/",
app.generateScholix("/home/sandro/Downloads/scholix/", spark) spark
)
app.generateFlatScholix(s"$basePath/scholix/", spark)
} }
} }

View File

@ -14,7 +14,7 @@
</property> </property>
</parameters> </parameters>
<start to="DropAndCreateIndex"/> <start to="indexSummary"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -42,7 +42,7 @@
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8" --conf spark.dynamicAllocation.maxExecutors="16"
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -50,10 +50,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/summaries_json</arg> <arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/summary_json</arg>
<arg>--index</arg><arg>${index}_object</arg> <arg>--index</arg><arg>summary</arg>
<arg>--idPath</arg><arg>id</arg> <arg>--idPath</arg><arg>id</arg>
<arg>--cluster</arg><arg>${esCluster}</arg> <arg>--cluster</arg><arg>cluster1</arg>
</spark> </spark>
<ok to="indexScholix"/> <ok to="indexScholix"/>
<error to="Kill"/> <error to="Kill"/>
@ -68,7 +68,7 @@
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8" --conf spark.dynamicAllocation.maxExecutors="48"
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -76,10 +76,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/scholix_json</arg> <arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/scholix</arg>
<arg>--index</arg><arg>${index}_scholix</arg> <arg>--index</arg><arg>scholix</arg>
<arg>--idPath</arg><arg>identifier</arg> <arg>--idPath</arg><arg>identifier</arg>
<arg>--cluster</arg><arg>${esCluster}</arg> <arg>--cluster</arg><arg>cluster1</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-scholix-provision</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${dhp.jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.lmax</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<artifactId>ant</artifactId>
<groupId>org.apache.ant</groupId>
</exclusion>
<exclusion>
<artifactId>antlr4-runtime</artifactId>
<groupId>org.antlr</groupId>
</exclusion>
<exclusion>
<artifactId>woodstox-core</artifactId>
<groupId>com.fasterxml.woodstox</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>*</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>1.2.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,17 @@
package eu.dnetlib.sx.index;
import org.elasticsearch.action.index.IndexRequest;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
public abstract class RequestManager {
String extractIdentifier(final String json, final String jpath) {
DocumentContext jsonContext = JsonPath.parse(json);
return jsonContext.read(jpath);
}
public abstract IndexRequest createRequest(final String line, final String indexName);
}

View File

@ -0,0 +1,13 @@
package eu.dnetlib.sx.index;
public class RequestManagerFactory {
public static RequestManager fromType(final String type) {
if ("scholix".equalsIgnoreCase(type))
return new ScholixRequestManager();
if ("summary".equalsIgnoreCase(type))
return new SummaryRequestManager();
throw new IllegalArgumentException("unexpected type");
}
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.sx.index;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
public class ScholixRequestManager extends RequestManager {
final static String identifierPath = "$.identifier";
@Override
public IndexRequest createRequest(final String line, final String indexName) {
return new IndexRequest()
.index(indexName)
.id(extractIdentifier(line, identifierPath))
.source(line, XContentType.JSON);
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.sx.index;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SummaryRequestManager extends RequestManager {
final static String identifierPath = "$.id";
final static ObjectMapper mapper = new ObjectMapper();
private String constructSource(String line) {
Map<String, Object> params = new HashMap<>();
params.put("body", line);
try {
return mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public IndexRequest createRequest(String line, String indexName) {
return new IndexRequest()
.index(indexName)
.id(extractIdentifier(line, identifierPath))
.source(constructSource(line), XContentType.JSON);
}
}

View File

@ -0,0 +1,109 @@
package eu.dnetlib.sx.index.feeder;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.sx.index.RequestManager;
public class FeedWorker extends Thread {
private final FileSystem fileSystem;
public static String JOB_COMPLETE = "JOB_COMPLETE";
private final BlockingQueue<String> queue;
private final Logger log = LoggerFactory.getLogger(getClass().getName());
private boolean hasComplete = false;
private final String index;
private final RequestManager requestCreator;
private final RestHighLevelClient client;
public FeedWorker(
FileSystem fileSystem, BlockingQueue<String> queue,
RequestManager requestCreator, final String host, final String index) {
this.fileSystem = fileSystem;
this.queue = queue;
this.index = index;
this.requestCreator = requestCreator;
this.client = createRESTClient(host);
}
private RestHighLevelClient createRESTClient(String host) {
System.out.println("Creating client with host = " + host);
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, 9200, "http")));
}
@Override
public void run() {
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
while (!hasComplete) {
try {
final String nextItem = queue.take();
if (JOB_COMPLETE.equalsIgnoreCase(nextItem)) {
hasComplete = true;
} else {
System.out.println("Parsing " + nextItem + "\n");
final Path currentPath = new Path(nextItem);
final CompressionCodec codec = factory.getCodec(currentPath);
InputStream gzipInputStream = codec.createInputStream(fileSystem.open(currentPath));
Reader decoder = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(decoder);
doIndexChunk(reader);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
private BulkRequest createRequest() {
final BulkRequest request = new BulkRequest();
request.timeout("2m");
return request;
}
private void doIndexChunk(final BufferedReader reader) throws Exception {
String next;
BulkRequest request = createRequest();
int i = 0;
while ((next = reader.readLine()) != null) {
request.add(this.requestCreator.createRequest(next, index));
if (i++ % 10000 == 0) {
client.bulk(request, RequestOptions.DEFAULT);
request = createRequest();
System.out.printf("Bulk-> %d items \n", i);
log.debug("Bulk-> {} items ", i);
}
}
client.bulk(request, RequestOptions.DEFAULT);
System.out.printf(" Final Bulk-> %d items \n", i);
log.debug("Final Bulk-> {} items ", i);
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.sx.index.feeder;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.sx.index.RequestManagerFactory;
public class IndexFeed {
private final FileSystem fileSystem;
private final static Logger log = LoggerFactory.getLogger(IndexFeed.class);
public IndexFeed(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
IndexFeed.class
.getResourceAsStream(
"/eu/dnetlib/sx/provision/feed_index_params.json"))));
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI is {}", hdfsuri);
final String sourcePath = argumentParser.get("sourcePath");
log.info("sourcePath is {}", sourcePath);
final String cluster = argumentParser.get("cluster");
log.info("cluster is {}", cluster);
final String index = argumentParser.get("index");
log.info("index is {}", index);
final String clusterJson = IOUtils
.toString(
Objects
.requireNonNull(
IndexFeed.class.getResourceAsStream("/eu/dnetlib/sx/provision/cluster.json")));
@SuppressWarnings("unchecked")
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
if (!clusterMap.containsKey(cluster)) {
throw new RuntimeException(
String.format("Cluster %s not found, expected values cluster1, cluster2", cluster));
}
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
new IndexFeed(fileSystem).run(sourcePath, clusterMap.get(cluster), index);
}
public void run(final String sourcePath, final String host, final String index) throws Exception {
RemoteIterator<LocatedFileStatus> ls = fileSystem.listFiles(new Path(sourcePath), false);
final List<FeedWorker> workers = new ArrayList<>();
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3000);
for (String currentHost : host.split(",")) {
workers
.add(
new FeedWorker(fileSystem, queue, RequestManagerFactory.fromType(index), currentHost.trim(),
index));
}
workers.forEach(Thread::start);
while (ls.hasNext()) {
LocatedFileStatus current = ls.next();
if (current.getPath().getName().endsWith(".gz")) {
queue.put(current.getPath().toString());
}
}
for (FeedWorker worker : workers) {
try {
queue.put(FeedWorker.JOB_COMPLETE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
worker.join();
}
}
}

View File

@ -0,0 +1,4 @@
{
"cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
"cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
}

View File

@ -0,0 +1,27 @@
[
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the name node param",
"paramRequired": true
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "The source Path",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "cluster",
"paramDescription": "The cluster name",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "index",
"paramDescription": "The index name",
"paramRequired": true
}
]

View File

@ -0,0 +1,14 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,62 @@
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the sourcePath of the json RDDs</description>
</property>
<property>
<name>esCluster</name>
<description>the Index cluster</description>
</property>
</parameters>
<start to="indexSummary"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
<arg>-namenode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/summary_json</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
<arg>--index</arg><arg>summary</arg>
</java>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
<arg>-namenode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/scholix</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
<arg>--index</arg><arg>scholix</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,63 @@
{
"settings": {
"index": {
"number_of_shards": "140",
"number_of_replicas": "0",
"refresh_interval": "-1",
"translog.flush_threshold_size": "2048MB",
"codec": "best_compression"
}
},
"mappings": {
"properties": {
"identifier": {
"type": "keyword"
},
"linkProviders": {
"type": "text"
},
"publicationDate": {
"type": "date"
},
"relationType": {
"type": "keyword"
},
"sourceId": {
"type": "keyword"
},
"sourcePid": {
"type": "keyword"
},
"sourcePidType": {
"type": "keyword"
},
"sourcePublisher": {
"type": "text"
},
"sourceSubType": {
"type": "keyword"
},
"sourceType": {
"type": "keyword"
},
"targetId": {
"type": "keyword"
},
"targetPid": {
"type": "keyword"
},
"targetPidType": {
"type": "keyword"
},
"targetPublisher": {
"type": "text"
},
"targetSubType": {
"type": "keyword"
},
"targetType": {
"type": "keyword"
}
}
}
}

View File

@ -0,0 +1,19 @@
{
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "0",
"refresh_interval": "-1",
"translog.flush_threshold_size": "2048MB",
"codec": "best_compression"
}
},
"mappings": {
"properties": {
"body": {
"type": "text",
"index": false
}
}
}
}

View File

@ -0,0 +1,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;
import eu.dnetlib.sx.index.feeder.IndexFeed;
public class IndexPathTest {
@Test
public void testIndexing() throws Exception {
Configuration conf = new Configuration();
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(conf);
new IndexFeed(fs).run("file:///Users/sandro/Downloads/scholix/summary_js", "localhost", "summary");
}
}

View File

@ -0,0 +1,11 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.logger.org = ERROR
log4j.logger.eu.dnetlib = DEBUG
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@ -43,6 +43,7 @@
<module>dhp-doiboost</module> <module>dhp-doiboost</module>
<module>dhp-impact-indicators</module> <module>dhp-impact-indicators</module>
<module>dhp-swh</module> <module>dhp-swh</module>
<module>dhp-scholix-provision</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>

View File

@ -960,7 +960,7 @@
<commons.logging.version>1.1.3</commons.logging.version> <commons.logging.version>1.1.3</commons.logging.version>
<commons-validator.version>1.7</commons-validator.version> <commons-validator.version>1.7</commons-validator.version>
<dateparser.version>1.0.7</dateparser.version> <dateparser.version>1.0.7</dateparser.version>
<dhp-schemas.version>[6.1.2]</dhp-schemas.version> <dhp-schemas.version>[6.1.3-FLAT-SCHOLIX]</dhp-schemas.version>
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version> <dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version> <dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<dhp.guava.version>11.0.2</dhp.guava.version> <dhp.guava.version>11.0.2</dhp.guava.version>