Compare commits
3 Commits
main
...
scholix_sm
Author | SHA1 | Date |
---|---|---|
Sandro La Bruzzo | 2a7e5de094 | |
Sandro La Bruzzo | 844a31f7a6 | |
Sandro La Bruzzo | 0c934d3c39 |
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
||||
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||
import eu.dnetlib.dhp.schema.sx.scholix.{
|
||||
Scholix,
|
||||
ScholixCollectedFrom,
|
||||
|
@ -10,6 +11,7 @@ import eu.dnetlib.dhp.schema.sx.scholix.{
|
|||
ScholixRelationship,
|
||||
ScholixResource
|
||||
}
|
||||
import org.apache.logging.log4j.core.appender.ConsoleAppender.Target
|
||||
import org.json4s
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
@ -26,6 +28,16 @@ case class RelationInfo(
|
|||
) {}
|
||||
case class RelKeyValue(key: String, value: String) {}
|
||||
|
||||
case class SummaryResource(
|
||||
id: String,
|
||||
typology: String,
|
||||
subType: String,
|
||||
pids: List[String],
|
||||
pidTypes: List[String],
|
||||
publishers: List[String],
|
||||
date: String
|
||||
) {}
|
||||
|
||||
object ScholexplorerUtils {
|
||||
|
||||
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
||||
|
@ -86,6 +98,99 @@ object ScholexplorerUtils {
|
|||
.toList
|
||||
}
|
||||
|
||||
def generateSummaryResource(input: ScholixResource): SummaryResource = {
|
||||
val distinctIds = if (input.getIdentifier != null) {
|
||||
input.getIdentifier.asScala.map(i => i.getIdentifier).distinct.toList
|
||||
} else List()
|
||||
val distinctTypes = if (input.getIdentifier != null) {
|
||||
input.getIdentifier.asScala.map(i => i.getSchema).distinct.toList
|
||||
} else List()
|
||||
val distinctPublishers = if (input.getPublisher != null) {
|
||||
input.getPublisher.asScala.map(i => i.getName).distinct.sorted.take(5).toList
|
||||
} else List()
|
||||
SummaryResource(
|
||||
id = input.getDnetIdentifier,
|
||||
typology = input.getObjectType,
|
||||
subType = input.getObjectSubType,
|
||||
pids = distinctIds,
|
||||
pidTypes = distinctTypes,
|
||||
publishers = distinctPublishers,
|
||||
date = input.getPublicationDate
|
||||
)
|
||||
}
|
||||
|
||||
def generateScholixFlat(relation: RelationInfo, summary: SummaryResource, updateSource: Boolean): ScholixFlat = {
|
||||
val scholix = new ScholixFlat
|
||||
scholix.setIdentifier(relation.id)
|
||||
if (relation.collectedfrom != null && relation.collectedfrom.nonEmpty)
|
||||
scholix.setLinkProviders(
|
||||
relation.collectedfrom
|
||||
.map(cf => {
|
||||
cf.value
|
||||
})
|
||||
.distinct
|
||||
.sorted
|
||||
.take(5)
|
||||
.toList
|
||||
.asJava
|
||||
)
|
||||
else {
|
||||
scholix.setLinkProviders(List("OpenAIRE").asJava)
|
||||
}
|
||||
val semanticRelation = relations.getOrElse(relation.relclass.toLowerCase, null)
|
||||
if (semanticRelation == null)
|
||||
return null
|
||||
|
||||
scholix.setRelationType(semanticRelation.original)
|
||||
scholix.setPublicationDate(summary.date)
|
||||
if (updateSource) {
|
||||
if (summary.pids.isEmpty)
|
||||
return null
|
||||
scholix.setSourceId(summary.id)
|
||||
scholix.setSourcePid(summary.pids.asJava)
|
||||
scholix.setSourcePidType(summary.pidTypes.asJava)
|
||||
scholix.setSourceType(summary.typology)
|
||||
scholix.setSourceSubType(summary.subType)
|
||||
if (summary.publishers.nonEmpty) {
|
||||
scholix.setSourcePublisher(summary.publishers.asJava)
|
||||
}
|
||||
} else {
|
||||
if (summary.pids.isEmpty)
|
||||
return null
|
||||
scholix.setTargetId(summary.id)
|
||||
scholix.setTargetPid(summary.pids.asJava)
|
||||
scholix.setTargetPidType(summary.pidTypes.asJava)
|
||||
scholix.setTargetType(summary.typology)
|
||||
scholix.setTargetSubType(summary.subType)
|
||||
if (summary.publishers.nonEmpty) {
|
||||
scholix.setTargetPublisher(summary.publishers.asJava)
|
||||
}
|
||||
}
|
||||
scholix
|
||||
}
|
||||
|
||||
def mergeScholixFlat(source: ScholixFlat, target: ScholixFlat): ScholixFlat = {
|
||||
if (source.getPublicationDate == null) {
|
||||
source.setPublicationDate(target.getPublicationDate)
|
||||
}
|
||||
|
||||
source.setTargetId(target.getTargetId)
|
||||
source.setTargetPid(target.getTargetPid)
|
||||
source.setTargetPidType(target.getTargetPidType)
|
||||
source.setTargetType(target.getTargetType)
|
||||
source.setTargetSubType(target.getTargetSubType)
|
||||
|
||||
if (source.getLinkProviders != null)
|
||||
source.setTargetPublisher(target.getTargetPublisher)
|
||||
else if (source.getLinkProviders != null && target.getLinkProviders != null) {
|
||||
source.setLinkProviders(
|
||||
source.getLinkProviders.asScala.union(target.getLinkProviders.asScala).sorted.distinct.take(5).asJava
|
||||
)
|
||||
}
|
||||
|
||||
source
|
||||
}
|
||||
|
||||
def generateScholixResourceFromResult(result: Result): ScholixResource = {
|
||||
|
||||
if (result.getInstance() == null || result.getInstance().size() == 0)
|
||||
|
|
|
@ -10,6 +10,7 @@ import eu.dnetlib.dhp.schema.oaf.{
|
|||
Software,
|
||||
Dataset => OafDataset
|
||||
}
|
||||
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
||||
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
@ -29,7 +30,7 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
|||
log.info("targetPath: {}", targetPath)
|
||||
generateBidirectionalRelations(sourcePath, targetPath, spark)
|
||||
generateScholixResource(sourcePath, targetPath, spark)
|
||||
generateScholix(targetPath, spark)
|
||||
generateFlatScholix(targetPath, spark)
|
||||
}
|
||||
|
||||
def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = {
|
||||
|
@ -101,6 +102,47 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
|||
|
||||
}
|
||||
|
||||
def generateFlatScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||
import spark.implicits._
|
||||
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||
implicit val scholixEncoder: Encoder[ScholixFlat] = Encoders.bean(classOf[ScholixFlat])
|
||||
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
|
||||
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
|
||||
|
||||
resource
|
||||
.map(s => ScholexplorerUtils.generateSummaryResource(s))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$outputPath/summary")
|
||||
val summaries = spark.read.load(s"$outputPath/summary").as[SummaryResource]
|
||||
|
||||
relations
|
||||
.joinWith(summaries, relations("source") === summaries("id"))
|
||||
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, true))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$outputPath/scholix_source")
|
||||
|
||||
val scholix_source = spark.read.load(s"$outputPath/scholix_source").as[ScholixFlat]
|
||||
|
||||
relations
|
||||
.joinWith(summaries, relations("target") === summaries("id"))
|
||||
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, false))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$outputPath/scholix_target")
|
||||
|
||||
val scholix_target = spark.read.load(s"$outputPath/scholix_target").as[ScholixFlat]
|
||||
|
||||
scholix_source
|
||||
.joinWith(scholix_target, scholix_source("identifier") === scholix_target("identifier"), "inner")
|
||||
.map(s => ScholexplorerUtils.mergeScholixFlat(s._1, s._2))
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(s"$outputPath/scholix")
|
||||
}
|
||||
|
||||
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])
|
||||
|
|
|
@ -14,13 +14,15 @@ class ScholixGenerationTest {
|
|||
|
||||
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
||||
val app = new SparkCreateScholexplorerDump(null, null, null)
|
||||
// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
|
||||
// app.generateBidirectionalRelations(
|
||||
// "/home/sandro/Downloads/scholix_sample/",
|
||||
// "/home/sandro/Downloads/scholix/",
|
||||
// spark
|
||||
// )
|
||||
app.generateScholix("/home/sandro/Downloads/scholix/", spark)
|
||||
|
||||
val basePath = "/Users/sandro/Downloads"
|
||||
app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark)
|
||||
app.generateBidirectionalRelations(
|
||||
s"$basePath/scholix_sample/",
|
||||
s"$basePath/scholix/",
|
||||
spark
|
||||
)
|
||||
app.generateFlatScholix(s"$basePath/scholix/", spark)
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="DropAndCreateIndex"/>
|
||||
<start to="indexSummary"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
|
@ -42,7 +42,7 @@
|
|||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--conf spark.dynamicAllocation.maxExecutors="8"
|
||||
--conf spark.dynamicAllocation.maxExecutors="16"
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
|
@ -50,10 +50,10 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/summaries_json</arg>
|
||||
<arg>--index</arg><arg>${index}_object</arg>
|
||||
<arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/summary_json</arg>
|
||||
<arg>--index</arg><arg>summary</arg>
|
||||
<arg>--idPath</arg><arg>id</arg>
|
||||
<arg>--cluster</arg><arg>${esCluster}</arg>
|
||||
<arg>--cluster</arg><arg>cluster1</arg>
|
||||
</spark>
|
||||
<ok to="indexScholix"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -68,7 +68,7 @@
|
|||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--conf spark.dynamicAllocation.maxExecutors="8"
|
||||
--conf spark.dynamicAllocation.maxExecutors="48"
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
|
@ -76,10 +76,10 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/scholix_json</arg>
|
||||
<arg>--index</arg><arg>${index}_scholix</arg>
|
||||
<arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/scholix</arg>
|
||||
<arg>--index</arg><arg>scholix</arg>
|
||||
<arg>--idPath</arg><arg>identifier</arg>
|
||||
<arg>--cluster</arg><arg>${esCluster}</arg>
|
||||
<arg>--cluster</arg><arg>cluster1</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-workflows</artifactId>
|
||||
<version>1.2.5-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>dhp-scholix-provision</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${dhp.jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch.client</groupId>
|
||||
<artifactId>elasticsearch-rest-high-level-client</artifactId>
|
||||
<version>7.6.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.janino</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.codehaus.woodstox</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.lmax</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.tdunning</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>ant</artifactId>
|
||||
<groupId>org.apache.ant</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>antlr4-runtime</artifactId>
|
||||
<groupId>org.antlr</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>woodstox-core</artifactId>
|
||||
<groupId>com.fasterxml.woodstox</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j</artifactId>
|
||||
<groupId>*</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
<artifactId>json-path</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>1.2.5-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,17 @@
|
|||
|
||||
package eu.dnetlib.sx.index;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
|
||||
public abstract class RequestManager {
|
||||
|
||||
String extractIdentifier(final String json, final String jpath) {
|
||||
DocumentContext jsonContext = JsonPath.parse(json);
|
||||
return jsonContext.read(jpath);
|
||||
}
|
||||
|
||||
public abstract IndexRequest createRequest(final String line, final String indexName);
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
|
||||
package eu.dnetlib.sx.index;
|
||||
|
||||
public class RequestManagerFactory {
|
||||
public static RequestManager fromType(final String type) {
|
||||
if ("scholix".equalsIgnoreCase(type))
|
||||
return new ScholixRequestManager();
|
||||
if ("summary".equalsIgnoreCase(type))
|
||||
return new SummaryRequestManager();
|
||||
throw new IllegalArgumentException("unexpected type");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
|
||||
package eu.dnetlib.sx.index;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
public class ScholixRequestManager extends RequestManager {
|
||||
|
||||
final static String identifierPath = "$.identifier";
|
||||
|
||||
@Override
|
||||
public IndexRequest createRequest(final String line, final String indexName) {
|
||||
return new IndexRequest()
|
||||
.index(indexName)
|
||||
.id(extractIdentifier(line, identifierPath))
|
||||
.source(line, XContentType.JSON);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
|
||||
package eu.dnetlib.sx.index;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class SummaryRequestManager extends RequestManager {
|
||||
|
||||
final static String identifierPath = "$.id";
|
||||
final static ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private String constructSource(String line) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("body", line);
|
||||
try {
|
||||
return mapper.writeValueAsString(params);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexRequest createRequest(String line, String indexName) {
|
||||
return new IndexRequest()
|
||||
.index(indexName)
|
||||
.id(extractIdentifier(line, identifierPath))
|
||||
.source(constructSource(line), XContentType.JSON);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
|
||||
package eu.dnetlib.sx.index.feeder;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.sx.index.RequestManager;
|
||||
|
||||
public class FeedWorker extends Thread {
|
||||
|
||||
private final FileSystem fileSystem;
|
||||
|
||||
public static String JOB_COMPLETE = "JOB_COMPLETE";
|
||||
|
||||
private final BlockingQueue<String> queue;
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(getClass().getName());
|
||||
|
||||
private boolean hasComplete = false;
|
||||
private final String index;
|
||||
|
||||
private final RequestManager requestCreator;
|
||||
|
||||
private final RestHighLevelClient client;
|
||||
|
||||
public FeedWorker(
|
||||
FileSystem fileSystem, BlockingQueue<String> queue,
|
||||
RequestManager requestCreator, final String host, final String index) {
|
||||
this.fileSystem = fileSystem;
|
||||
this.queue = queue;
|
||||
this.index = index;
|
||||
this.requestCreator = requestCreator;
|
||||
this.client = createRESTClient(host);
|
||||
}
|
||||
|
||||
private RestHighLevelClient createRESTClient(String host) {
|
||||
System.out.println("Creating client with host = " + host);
|
||||
|
||||
return new RestHighLevelClient(
|
||||
RestClient.builder(new HttpHost(host, 9200, "http")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
|
||||
while (!hasComplete) {
|
||||
try {
|
||||
final String nextItem = queue.take();
|
||||
if (JOB_COMPLETE.equalsIgnoreCase(nextItem)) {
|
||||
hasComplete = true;
|
||||
} else {
|
||||
System.out.println("Parsing " + nextItem + "\n");
|
||||
final Path currentPath = new Path(nextItem);
|
||||
|
||||
final CompressionCodec codec = factory.getCodec(currentPath);
|
||||
InputStream gzipInputStream = codec.createInputStream(fileSystem.open(currentPath));
|
||||
Reader decoder = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8);
|
||||
BufferedReader reader = new BufferedReader(decoder);
|
||||
doIndexChunk(reader);
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private BulkRequest createRequest() {
|
||||
final BulkRequest request = new BulkRequest();
|
||||
request.timeout("2m");
|
||||
return request;
|
||||
|
||||
}
|
||||
|
||||
private void doIndexChunk(final BufferedReader reader) throws Exception {
|
||||
String next;
|
||||
BulkRequest request = createRequest();
|
||||
int i = 0;
|
||||
while ((next = reader.readLine()) != null) {
|
||||
request.add(this.requestCreator.createRequest(next, index));
|
||||
if (i++ % 10000 == 0) {
|
||||
client.bulk(request, RequestOptions.DEFAULT);
|
||||
request = createRequest();
|
||||
System.out.printf("Bulk-> %d items \n", i);
|
||||
log.debug("Bulk-> {} items ", i);
|
||||
}
|
||||
}
|
||||
client.bulk(request, RequestOptions.DEFAULT);
|
||||
System.out.printf(" Final Bulk-> %d items \n", i);
|
||||
log.debug("Final Bulk-> {} items ", i);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
|
||||
package eu.dnetlib.sx.index.feeder;
|
||||
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.sx.index.RequestManagerFactory;
|
||||
|
||||
public class IndexFeed {
|
||||
|
||||
private final FileSystem fileSystem;
|
||||
|
||||
private final static Logger log = LoggerFactory.getLogger(IndexFeed.class);
|
||||
|
||||
public IndexFeed(FileSystem fileSystem) {
|
||||
this.fileSystem = fileSystem;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
IndexFeed.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/sx/provision/feed_index_params.json"))));
|
||||
argumentParser.parseArgument(args);
|
||||
|
||||
final String hdfsuri = argumentParser.get("namenode");
|
||||
log.info("hdfsURI is {}", hdfsuri);
|
||||
|
||||
final String sourcePath = argumentParser.get("sourcePath");
|
||||
log.info("sourcePath is {}", sourcePath);
|
||||
|
||||
final String cluster = argumentParser.get("cluster");
|
||||
log.info("cluster is {}", cluster);
|
||||
|
||||
final String index = argumentParser.get("index");
|
||||
log.info("index is {}", index);
|
||||
|
||||
final String clusterJson = IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
IndexFeed.class.getResourceAsStream("/eu/dnetlib/sx/provision/cluster.json")));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
|
||||
|
||||
if (!clusterMap.containsKey(cluster)) {
|
||||
throw new RuntimeException(
|
||||
String.format("Cluster %s not found, expected values cluster1, cluster2", cluster));
|
||||
}
|
||||
|
||||
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
|
||||
|
||||
new IndexFeed(fileSystem).run(sourcePath, clusterMap.get(cluster), index);
|
||||
}
|
||||
|
||||
public void run(final String sourcePath, final String host, final String index) throws Exception {
|
||||
RemoteIterator<LocatedFileStatus> ls = fileSystem.listFiles(new Path(sourcePath), false);
|
||||
final List<FeedWorker> workers = new ArrayList<>();
|
||||
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3000);
|
||||
for (String currentHost : host.split(",")) {
|
||||
workers
|
||||
.add(
|
||||
new FeedWorker(fileSystem, queue, RequestManagerFactory.fromType(index), currentHost.trim(),
|
||||
index));
|
||||
}
|
||||
workers.forEach(Thread::start);
|
||||
while (ls.hasNext()) {
|
||||
LocatedFileStatus current = ls.next();
|
||||
if (current.getPath().getName().endsWith(".gz")) {
|
||||
queue.put(current.getPath().toString());
|
||||
}
|
||||
}
|
||||
|
||||
for (FeedWorker worker : workers) {
|
||||
try {
|
||||
queue.put(FeedWorker.JOB_COMPLETE);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
worker.join();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
{
|
||||
"cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
|
||||
"cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
[
|
||||
{
|
||||
"paramName": "n",
|
||||
"paramLongName": "namenode",
|
||||
"paramDescription": "the name node param",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "The source Path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "c",
|
||||
"paramLongName": "cluster",
|
||||
"paramDescription": "The cluster name",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "index",
|
||||
"paramDescription": "The index name",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
|
@ -0,0 +1,14 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,62 @@
|
|||
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the sourcePath of the json RDDs</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>esCluster</name>
|
||||
<description>the Index cluster</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="indexSummary"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="DropAndCreateIndex">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex</main-class>
|
||||
<arg>-i</arg><arg>${index}</arg>
|
||||
<arg>-c</arg><arg>${esCluster}</arg>
|
||||
</java>
|
||||
<ok to="indexSummary"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="indexSummary">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
|
||||
<arg>-namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/summary_json</arg>
|
||||
<arg>--cluster</arg><arg>${esCluster}</arg>
|
||||
<arg>--index</arg><arg>summary</arg>
|
||||
</java>
|
||||
<ok to="indexScholix"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="indexScholix">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
|
||||
<arg>-namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/scholix</arg>
|
||||
<arg>--cluster</arg><arg>${esCluster}</arg>
|
||||
<arg>--index</arg><arg>scholix</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,63 @@
|
|||
{
|
||||
"settings": {
|
||||
"index": {
|
||||
"number_of_shards": "140",
|
||||
"number_of_replicas": "0",
|
||||
"refresh_interval": "-1",
|
||||
"translog.flush_threshold_size": "2048MB",
|
||||
"codec": "best_compression"
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"identifier": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"linkProviders": {
|
||||
"type": "text"
|
||||
},
|
||||
"publicationDate": {
|
||||
"type": "date"
|
||||
},
|
||||
"relationType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"sourceId": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"sourcePid": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"sourcePidType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"sourcePublisher": {
|
||||
"type": "text"
|
||||
},
|
||||
"sourceSubType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"sourceType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"targetId": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"targetPid": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"targetPidType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"targetPublisher": {
|
||||
"type": "text"
|
||||
},
|
||||
"targetSubType": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"targetType": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"settings": {
|
||||
"index": {
|
||||
"number_of_shards": "5",
|
||||
"number_of_replicas": "0",
|
||||
"refresh_interval": "-1",
|
||||
"translog.flush_threshold_size": "2048MB",
|
||||
"codec": "best_compression"
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "text",
|
||||
"index": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import eu.dnetlib.sx.index.feeder.IndexFeed;
|
||||
|
||||
public class IndexPathTest {
|
||||
|
||||
@Test
|
||||
public void testIndexing() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
// Because of Maven
|
||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
new IndexFeed(fs).run("file:///Users/sandro/Downloads/scholix/summary_js", "localhost", "summary");
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
# Set root logger level to DEBUG and its only appender to A1.
|
||||
log4j.rootLogger=INFO, A1
|
||||
|
||||
# A1 is set to be a ConsoleAppender.
|
||||
log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||
|
||||
# A1 uses PatternLayout.
|
||||
log4j.logger.org = ERROR
|
||||
log4j.logger.eu.dnetlib = DEBUG
|
||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
|
@ -43,6 +43,7 @@
|
|||
<module>dhp-doiboost</module>
|
||||
<module>dhp-impact-indicators</module>
|
||||
<module>dhp-swh</module>
|
||||
<module>dhp-scholix-provision</module>
|
||||
</modules>
|
||||
|
||||
<pluginRepositories>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -960,7 +960,7 @@
|
|||
<commons.logging.version>1.1.3</commons.logging.version>
|
||||
<commons-validator.version>1.7</commons-validator.version>
|
||||
<dateparser.version>1.0.7</dateparser.version>
|
||||
<dhp-schemas.version>[6.1.2]</dhp-schemas.version>
|
||||
<dhp-schemas.version>[6.1.3-FLAT-SCHOLIX]</dhp-schemas.version>
|
||||
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||
|
|
Loading…
Reference in New Issue