Compare commits
3 Commits
main
...
scholix_sm
Author | SHA1 | Date |
---|---|---|
Sandro La Bruzzo | 2a7e5de094 | |
Sandro La Bruzzo | 844a31f7a6 | |
Sandro La Bruzzo | 0c934d3c39 |
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty}
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix.{
|
import eu.dnetlib.dhp.schema.sx.scholix.{
|
||||||
Scholix,
|
Scholix,
|
||||||
ScholixCollectedFrom,
|
ScholixCollectedFrom,
|
||||||
|
@ -10,6 +11,7 @@ import eu.dnetlib.dhp.schema.sx.scholix.{
|
||||||
ScholixRelationship,
|
ScholixRelationship,
|
||||||
ScholixResource
|
ScholixResource
|
||||||
}
|
}
|
||||||
|
import org.apache.logging.log4j.core.appender.ConsoleAppender.Target
|
||||||
import org.json4s
|
import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
@ -26,6 +28,16 @@ case class RelationInfo(
|
||||||
) {}
|
) {}
|
||||||
case class RelKeyValue(key: String, value: String) {}
|
case class RelKeyValue(key: String, value: String) {}
|
||||||
|
|
||||||
|
case class SummaryResource(
|
||||||
|
id: String,
|
||||||
|
typology: String,
|
||||||
|
subType: String,
|
||||||
|
pids: List[String],
|
||||||
|
pidTypes: List[String],
|
||||||
|
publishers: List[String],
|
||||||
|
date: String
|
||||||
|
) {}
|
||||||
|
|
||||||
object ScholexplorerUtils {
|
object ScholexplorerUtils {
|
||||||
|
|
||||||
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier"
|
||||||
|
@ -86,6 +98,99 @@ object ScholexplorerUtils {
|
||||||
.toList
|
.toList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def generateSummaryResource(input: ScholixResource): SummaryResource = {
|
||||||
|
val distinctIds = if (input.getIdentifier != null) {
|
||||||
|
input.getIdentifier.asScala.map(i => i.getIdentifier).distinct.toList
|
||||||
|
} else List()
|
||||||
|
val distinctTypes = if (input.getIdentifier != null) {
|
||||||
|
input.getIdentifier.asScala.map(i => i.getSchema).distinct.toList
|
||||||
|
} else List()
|
||||||
|
val distinctPublishers = if (input.getPublisher != null) {
|
||||||
|
input.getPublisher.asScala.map(i => i.getName).distinct.sorted.take(5).toList
|
||||||
|
} else List()
|
||||||
|
SummaryResource(
|
||||||
|
id = input.getDnetIdentifier,
|
||||||
|
typology = input.getObjectType,
|
||||||
|
subType = input.getObjectSubType,
|
||||||
|
pids = distinctIds,
|
||||||
|
pidTypes = distinctTypes,
|
||||||
|
publishers = distinctPublishers,
|
||||||
|
date = input.getPublicationDate
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateScholixFlat(relation: RelationInfo, summary: SummaryResource, updateSource: Boolean): ScholixFlat = {
|
||||||
|
val scholix = new ScholixFlat
|
||||||
|
scholix.setIdentifier(relation.id)
|
||||||
|
if (relation.collectedfrom != null && relation.collectedfrom.nonEmpty)
|
||||||
|
scholix.setLinkProviders(
|
||||||
|
relation.collectedfrom
|
||||||
|
.map(cf => {
|
||||||
|
cf.value
|
||||||
|
})
|
||||||
|
.distinct
|
||||||
|
.sorted
|
||||||
|
.take(5)
|
||||||
|
.toList
|
||||||
|
.asJava
|
||||||
|
)
|
||||||
|
else {
|
||||||
|
scholix.setLinkProviders(List("OpenAIRE").asJava)
|
||||||
|
}
|
||||||
|
val semanticRelation = relations.getOrElse(relation.relclass.toLowerCase, null)
|
||||||
|
if (semanticRelation == null)
|
||||||
|
return null
|
||||||
|
|
||||||
|
scholix.setRelationType(semanticRelation.original)
|
||||||
|
scholix.setPublicationDate(summary.date)
|
||||||
|
if (updateSource) {
|
||||||
|
if (summary.pids.isEmpty)
|
||||||
|
return null
|
||||||
|
scholix.setSourceId(summary.id)
|
||||||
|
scholix.setSourcePid(summary.pids.asJava)
|
||||||
|
scholix.setSourcePidType(summary.pidTypes.asJava)
|
||||||
|
scholix.setSourceType(summary.typology)
|
||||||
|
scholix.setSourceSubType(summary.subType)
|
||||||
|
if (summary.publishers.nonEmpty) {
|
||||||
|
scholix.setSourcePublisher(summary.publishers.asJava)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (summary.pids.isEmpty)
|
||||||
|
return null
|
||||||
|
scholix.setTargetId(summary.id)
|
||||||
|
scholix.setTargetPid(summary.pids.asJava)
|
||||||
|
scholix.setTargetPidType(summary.pidTypes.asJava)
|
||||||
|
scholix.setTargetType(summary.typology)
|
||||||
|
scholix.setTargetSubType(summary.subType)
|
||||||
|
if (summary.publishers.nonEmpty) {
|
||||||
|
scholix.setTargetPublisher(summary.publishers.asJava)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scholix
|
||||||
|
}
|
||||||
|
|
||||||
|
def mergeScholixFlat(source: ScholixFlat, target: ScholixFlat): ScholixFlat = {
|
||||||
|
if (source.getPublicationDate == null) {
|
||||||
|
source.setPublicationDate(target.getPublicationDate)
|
||||||
|
}
|
||||||
|
|
||||||
|
source.setTargetId(target.getTargetId)
|
||||||
|
source.setTargetPid(target.getTargetPid)
|
||||||
|
source.setTargetPidType(target.getTargetPidType)
|
||||||
|
source.setTargetType(target.getTargetType)
|
||||||
|
source.setTargetSubType(target.getTargetSubType)
|
||||||
|
|
||||||
|
if (source.getLinkProviders != null)
|
||||||
|
source.setTargetPublisher(target.getTargetPublisher)
|
||||||
|
else if (source.getLinkProviders != null && target.getLinkProviders != null) {
|
||||||
|
source.setLinkProviders(
|
||||||
|
source.getLinkProviders.asScala.union(target.getLinkProviders.asScala).sorted.distinct.take(5).asJava
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
source
|
||||||
|
}
|
||||||
|
|
||||||
def generateScholixResourceFromResult(result: Result): ScholixResource = {
|
def generateScholixResourceFromResult(result: Result): ScholixResource = {
|
||||||
|
|
||||||
if (result.getInstance() == null || result.getInstance().size() == 0)
|
if (result.getInstance() == null || result.getInstance().size() == 0)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import eu.dnetlib.dhp.schema.oaf.{
|
||||||
Software,
|
Software,
|
||||||
Dataset => OafDataset
|
Dataset => OafDataset
|
||||||
}
|
}
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.flat.ScholixFlat
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
|
||||||
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
import org.apache.spark.sql.functions.{col, concat, expr, first, md5}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
|
@ -29,7 +30,7 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
||||||
log.info("targetPath: {}", targetPath)
|
log.info("targetPath: {}", targetPath)
|
||||||
generateBidirectionalRelations(sourcePath, targetPath, spark)
|
generateBidirectionalRelations(sourcePath, targetPath, spark)
|
||||||
generateScholixResource(sourcePath, targetPath, spark)
|
generateScholixResource(sourcePath, targetPath, spark)
|
||||||
generateScholix(targetPath, spark)
|
generateFlatScholix(targetPath, spark)
|
||||||
}
|
}
|
||||||
|
|
||||||
def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = {
|
def generateScholixResource(inputPath: String, outputPath: String, spark: SparkSession): Unit = {
|
||||||
|
@ -101,6 +102,47 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def generateFlatScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||||
|
import spark.implicits._
|
||||||
|
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||||
|
implicit val scholixEncoder: Encoder[ScholixFlat] = Encoders.bean(classOf[ScholixFlat])
|
||||||
|
val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo]
|
||||||
|
val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource]
|
||||||
|
|
||||||
|
resource
|
||||||
|
.map(s => ScholexplorerUtils.generateSummaryResource(s))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(s"$outputPath/summary")
|
||||||
|
val summaries = spark.read.load(s"$outputPath/summary").as[SummaryResource]
|
||||||
|
|
||||||
|
relations
|
||||||
|
.joinWith(summaries, relations("source") === summaries("id"))
|
||||||
|
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, true))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(s"$outputPath/scholix_source")
|
||||||
|
|
||||||
|
val scholix_source = spark.read.load(s"$outputPath/scholix_source").as[ScholixFlat]
|
||||||
|
|
||||||
|
relations
|
||||||
|
.joinWith(summaries, relations("target") === summaries("id"))
|
||||||
|
.map(k => ScholexplorerUtils.generateScholixFlat(k._1, k._2, false))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(s"$outputPath/scholix_target")
|
||||||
|
|
||||||
|
val scholix_target = spark.read.load(s"$outputPath/scholix_target").as[ScholixFlat]
|
||||||
|
|
||||||
|
scholix_source
|
||||||
|
.joinWith(scholix_target, scholix_source("identifier") === scholix_target("identifier"), "inner")
|
||||||
|
.map(s => ScholexplorerUtils.mergeScholixFlat(s._1, s._2))
|
||||||
|
.write
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(s"$outputPath/scholix")
|
||||||
|
}
|
||||||
|
|
||||||
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
def generateScholix(outputPath: String, spark: SparkSession): Unit = {
|
||||||
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
|
||||||
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])
|
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix])
|
||||||
|
|
|
@ -14,13 +14,15 @@ class ScholixGenerationTest {
|
||||||
|
|
||||||
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
|
||||||
val app = new SparkCreateScholexplorerDump(null, null, null)
|
val app = new SparkCreateScholexplorerDump(null, null, null)
|
||||||
// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark)
|
|
||||||
// app.generateBidirectionalRelations(
|
val basePath = "/Users/sandro/Downloads"
|
||||||
// "/home/sandro/Downloads/scholix_sample/",
|
app.generateScholixResource(s"$basePath/scholix_sample/", s"$basePath/scholix/", spark)
|
||||||
// "/home/sandro/Downloads/scholix/",
|
app.generateBidirectionalRelations(
|
||||||
// spark
|
s"$basePath/scholix_sample/",
|
||||||
// )
|
s"$basePath/scholix/",
|
||||||
app.generateScholix("/home/sandro/Downloads/scholix/", spark)
|
spark
|
||||||
|
)
|
||||||
|
app.generateFlatScholix(s"$basePath/scholix/", spark)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="DropAndCreateIndex"/>
|
<start to="indexSummary"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
@ -42,7 +42,7 @@
|
||||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
--conf spark.dynamicAllocation.maxExecutors="8"
|
--conf spark.dynamicAllocation.maxExecutors="16"
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
@ -50,10 +50,10 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/summaries_json</arg>
|
<arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/summary_json</arg>
|
||||||
<arg>--index</arg><arg>${index}_object</arg>
|
<arg>--index</arg><arg>summary</arg>
|
||||||
<arg>--idPath</arg><arg>id</arg>
|
<arg>--idPath</arg><arg>id</arg>
|
||||||
<arg>--cluster</arg><arg>${esCluster}</arg>
|
<arg>--cluster</arg><arg>cluster1</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="indexScholix"/>
|
<ok to="indexScholix"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -68,7 +68,7 @@
|
||||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
--conf spark.dynamicAllocation.maxExecutors="8"
|
--conf spark.dynamicAllocation.maxExecutors="48"
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
@ -76,10 +76,10 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}/scholix_json</arg>
|
<arg>--sourcePath</arg><arg>/user/sandro.labruzzo/scholix_new/scholix</arg>
|
||||||
<arg>--index</arg><arg>${index}_scholix</arg>
|
<arg>--index</arg><arg>scholix</arg>
|
||||||
<arg>--idPath</arg><arg>identifier</arg>
|
<arg>--idPath</arg><arg>identifier</arg>
|
||||||
<arg>--cluster</arg><arg>${esCluster}</arg>
|
<arg>--cluster</arg><arg>cluster1</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
|
<artifactId>dhp-workflows</artifactId>
|
||||||
|
<version>1.2.5-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>dhp-scholix-provision</artifactId>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-core</artifactId>
|
||||||
|
<version>${dhp.jackson.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.elasticsearch.client</groupId>
|
||||||
|
<artifactId>elasticsearch-rest-high-level-client</artifactId>
|
||||||
|
<version>7.6.1</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.codehaus.jackson</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.codehaus.janino</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.codehaus.woodstox</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.google.protobuf</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.lmax</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.tdunning</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
<artifactId>zookeeper</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>ant</artifactId>
|
||||||
|
<groupId>org.apache.ant</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>antlr4-runtime</artifactId>
|
||||||
|
<groupId>org.antlr</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>woodstox-core</artifactId>
|
||||||
|
<groupId>com.fasterxml.woodstox</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>log4j</artifactId>
|
||||||
|
<groupId>*</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>*</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.jayway.jsonpath</groupId>
|
||||||
|
<artifactId>json-path</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
|
<artifactId>dhp-schemas</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>eu.dnetlib.dhp</groupId>
|
||||||
|
<artifactId>dhp-common</artifactId>
|
||||||
|
<version>1.2.5-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,17 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.sx.index;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
|
||||||
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
|
||||||
|
public abstract class RequestManager {
|
||||||
|
|
||||||
|
String extractIdentifier(final String json, final String jpath) {
|
||||||
|
DocumentContext jsonContext = JsonPath.parse(json);
|
||||||
|
return jsonContext.read(jpath);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract IndexRequest createRequest(final String line, final String indexName);
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.sx.index;
|
||||||
|
|
||||||
|
public class RequestManagerFactory {
|
||||||
|
public static RequestManager fromType(final String type) {
|
||||||
|
if ("scholix".equalsIgnoreCase(type))
|
||||||
|
return new ScholixRequestManager();
|
||||||
|
if ("summary".equalsIgnoreCase(type))
|
||||||
|
return new SummaryRequestManager();
|
||||||
|
throw new IllegalArgumentException("unexpected type");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.sx.index;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
|
||||||
|
public class ScholixRequestManager extends RequestManager {
|
||||||
|
|
||||||
|
final static String identifierPath = "$.identifier";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexRequest createRequest(final String line, final String indexName) {
|
||||||
|
return new IndexRequest()
|
||||||
|
.index(indexName)
|
||||||
|
.id(extractIdentifier(line, identifierPath))
|
||||||
|
.source(line, XContentType.JSON);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.sx.index;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
public class SummaryRequestManager extends RequestManager {
|
||||||
|
|
||||||
|
final static String identifierPath = "$.id";
|
||||||
|
final static ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
private String constructSource(String line) {
|
||||||
|
Map<String, Object> params = new HashMap<>();
|
||||||
|
params.put("body", line);
|
||||||
|
try {
|
||||||
|
return mapper.writeValueAsString(params);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexRequest createRequest(String line, String indexName) {
|
||||||
|
return new IndexRequest()
|
||||||
|
.index(indexName)
|
||||||
|
.id(extractIdentifier(line, identifierPath))
|
||||||
|
.source(constructSource(line), XContentType.JSON);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.sx.index.feeder;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.Reader;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||||
|
import org.apache.http.HttpHost;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.client.RequestOptions;
|
||||||
|
import org.elasticsearch.client.RestClient;
|
||||||
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.sx.index.RequestManager;
|
||||||
|
|
||||||
|
public class FeedWorker extends Thread {
|
||||||
|
|
||||||
|
private final FileSystem fileSystem;
|
||||||
|
|
||||||
|
public static String JOB_COMPLETE = "JOB_COMPLETE";
|
||||||
|
|
||||||
|
private final BlockingQueue<String> queue;
|
||||||
|
|
||||||
|
private final Logger log = LoggerFactory.getLogger(getClass().getName());
|
||||||
|
|
||||||
|
private boolean hasComplete = false;
|
||||||
|
private final String index;
|
||||||
|
|
||||||
|
private final RequestManager requestCreator;
|
||||||
|
|
||||||
|
private final RestHighLevelClient client;
|
||||||
|
|
||||||
|
public FeedWorker(
|
||||||
|
FileSystem fileSystem, BlockingQueue<String> queue,
|
||||||
|
RequestManager requestCreator, final String host, final String index) {
|
||||||
|
this.fileSystem = fileSystem;
|
||||||
|
this.queue = queue;
|
||||||
|
this.index = index;
|
||||||
|
this.requestCreator = requestCreator;
|
||||||
|
this.client = createRESTClient(host);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RestHighLevelClient createRESTClient(String host) {
|
||||||
|
System.out.println("Creating client with host = " + host);
|
||||||
|
|
||||||
|
return new RestHighLevelClient(
|
||||||
|
RestClient.builder(new HttpHost(host, 9200, "http")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
CompressionCodecFactory factory = new CompressionCodecFactory(fileSystem.getConf());
|
||||||
|
while (!hasComplete) {
|
||||||
|
try {
|
||||||
|
final String nextItem = queue.take();
|
||||||
|
if (JOB_COMPLETE.equalsIgnoreCase(nextItem)) {
|
||||||
|
hasComplete = true;
|
||||||
|
} else {
|
||||||
|
System.out.println("Parsing " + nextItem + "\n");
|
||||||
|
final Path currentPath = new Path(nextItem);
|
||||||
|
|
||||||
|
final CompressionCodec codec = factory.getCodec(currentPath);
|
||||||
|
InputStream gzipInputStream = codec.createInputStream(fileSystem.open(currentPath));
|
||||||
|
Reader decoder = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8);
|
||||||
|
BufferedReader reader = new BufferedReader(decoder);
|
||||||
|
doIndexChunk(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private BulkRequest createRequest() {
|
||||||
|
final BulkRequest request = new BulkRequest();
|
||||||
|
request.timeout("2m");
|
||||||
|
return request;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doIndexChunk(final BufferedReader reader) throws Exception {
|
||||||
|
String next;
|
||||||
|
BulkRequest request = createRequest();
|
||||||
|
int i = 0;
|
||||||
|
while ((next = reader.readLine()) != null) {
|
||||||
|
request.add(this.requestCreator.createRequest(next, index));
|
||||||
|
if (i++ % 10000 == 0) {
|
||||||
|
client.bulk(request, RequestOptions.DEFAULT);
|
||||||
|
request = createRequest();
|
||||||
|
System.out.printf("Bulk-> %d items \n", i);
|
||||||
|
log.debug("Bulk-> {} items ", i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client.bulk(request, RequestOptions.DEFAULT);
|
||||||
|
System.out.printf(" Final Bulk-> %d items \n", i);
|
||||||
|
log.debug("Final Bulk-> {} items ", i);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.sx.index.feeder;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.sx.index.RequestManagerFactory;
|
||||||
|
|
||||||
|
public class IndexFeed {
|
||||||
|
|
||||||
|
private final FileSystem fileSystem;
|
||||||
|
|
||||||
|
private final static Logger log = LoggerFactory.getLogger(IndexFeed.class);
|
||||||
|
|
||||||
|
public IndexFeed(FileSystem fileSystem) {
|
||||||
|
this.fileSystem = fileSystem;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
IndexFeed.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/sx/provision/feed_index_params.json"))));
|
||||||
|
argumentParser.parseArgument(args);
|
||||||
|
|
||||||
|
final String hdfsuri = argumentParser.get("namenode");
|
||||||
|
log.info("hdfsURI is {}", hdfsuri);
|
||||||
|
|
||||||
|
final String sourcePath = argumentParser.get("sourcePath");
|
||||||
|
log.info("sourcePath is {}", sourcePath);
|
||||||
|
|
||||||
|
final String cluster = argumentParser.get("cluster");
|
||||||
|
log.info("cluster is {}", cluster);
|
||||||
|
|
||||||
|
final String index = argumentParser.get("index");
|
||||||
|
log.info("index is {}", index);
|
||||||
|
|
||||||
|
final String clusterJson = IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
IndexFeed.class.getResourceAsStream("/eu/dnetlib/sx/provision/cluster.json")));
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
|
||||||
|
|
||||||
|
if (!clusterMap.containsKey(cluster)) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
String.format("Cluster %s not found, expected values cluster1, cluster2", cluster));
|
||||||
|
}
|
||||||
|
|
||||||
|
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
|
||||||
|
|
||||||
|
new IndexFeed(fileSystem).run(sourcePath, clusterMap.get(cluster), index);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run(final String sourcePath, final String host, final String index) throws Exception {
|
||||||
|
RemoteIterator<LocatedFileStatus> ls = fileSystem.listFiles(new Path(sourcePath), false);
|
||||||
|
final List<FeedWorker> workers = new ArrayList<>();
|
||||||
|
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3000);
|
||||||
|
for (String currentHost : host.split(",")) {
|
||||||
|
workers
|
||||||
|
.add(
|
||||||
|
new FeedWorker(fileSystem, queue, RequestManagerFactory.fromType(index), currentHost.trim(),
|
||||||
|
index));
|
||||||
|
}
|
||||||
|
workers.forEach(Thread::start);
|
||||||
|
while (ls.hasNext()) {
|
||||||
|
LocatedFileStatus current = ls.next();
|
||||||
|
if (current.getPath().getName().endsWith(".gz")) {
|
||||||
|
queue.put(current.getPath().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (FeedWorker worker : workers) {
|
||||||
|
try {
|
||||||
|
queue.put(FeedWorker.JOB_COMPLETE);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
worker.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
{
|
||||||
|
"cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
|
||||||
|
"cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "n",
|
||||||
|
"paramLongName": "namenode",
|
||||||
|
"paramDescription": "the name node param",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "s",
|
||||||
|
"paramLongName": "sourcePath",
|
||||||
|
"paramDescription": "The source Path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "c",
|
||||||
|
"paramLongName": "cluster",
|
||||||
|
"paramDescription": "The cluster name",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "i",
|
||||||
|
"paramLongName": "index",
|
||||||
|
"paramDescription": "The index name",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
|
||||||
|
]
|
|
@ -0,0 +1,14 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,62 @@
|
||||||
|
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>sourcePath</name>
|
||||||
|
<description>the sourcePath of the json RDDs</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>esCluster</name>
|
||||||
|
<description>the Index cluster</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="indexSummary"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="DropAndCreateIndex">
|
||||||
|
<java>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<main-class>eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex</main-class>
|
||||||
|
<arg>-i</arg><arg>${index}</arg>
|
||||||
|
<arg>-c</arg><arg>${esCluster}</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="indexSummary"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="indexSummary">
|
||||||
|
<java>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
|
||||||
|
<arg>-namenode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}/summary_json</arg>
|
||||||
|
<arg>--cluster</arg><arg>${esCluster}</arg>
|
||||||
|
<arg>--index</arg><arg>summary</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="indexScholix"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="indexScholix">
|
||||||
|
<java>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<main-class>eu.dnetlib.sx.index.feeder.IndexFeed</main-class>
|
||||||
|
<arg>-namenode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}/scholix</arg>
|
||||||
|
<arg>--cluster</arg><arg>${esCluster}</arg>
|
||||||
|
<arg>--index</arg><arg>scholix</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,63 @@
|
||||||
|
{
|
||||||
|
"settings": {
|
||||||
|
"index": {
|
||||||
|
"number_of_shards": "140",
|
||||||
|
"number_of_replicas": "0",
|
||||||
|
"refresh_interval": "-1",
|
||||||
|
"translog.flush_threshold_size": "2048MB",
|
||||||
|
"codec": "best_compression"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"mappings": {
|
||||||
|
"properties": {
|
||||||
|
"identifier": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"linkProviders": {
|
||||||
|
"type": "text"
|
||||||
|
},
|
||||||
|
"publicationDate": {
|
||||||
|
"type": "date"
|
||||||
|
},
|
||||||
|
"relationType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourceId": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourcePid": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourcePidType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourcePublisher": {
|
||||||
|
"type": "text"
|
||||||
|
},
|
||||||
|
"sourceSubType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourceType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetId": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetPid": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetPidType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetPublisher": {
|
||||||
|
"type": "text"
|
||||||
|
},
|
||||||
|
"targetSubType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetType": {
|
||||||
|
"type": "keyword"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
{
|
||||||
|
"settings": {
|
||||||
|
"index": {
|
||||||
|
"number_of_shards": "5",
|
||||||
|
"number_of_replicas": "0",
|
||||||
|
"refresh_interval": "-1",
|
||||||
|
"translog.flush_threshold_size": "2048MB",
|
||||||
|
"codec": "best_compression"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"mappings": {
|
||||||
|
"properties": {
|
||||||
|
"body": {
|
||||||
|
"type": "text",
|
||||||
|
"index": false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.sx.index.feeder.IndexFeed;
|
||||||
|
|
||||||
|
public class IndexPathTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexing() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Because of Maven
|
||||||
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||||
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
new IndexFeed(fs).run("file:///Users/sandro/Downloads/scholix/summary_js", "localhost", "summary");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
# Set root logger level to DEBUG and its only appender to A1.
|
||||||
|
log4j.rootLogger=INFO, A1
|
||||||
|
|
||||||
|
# A1 is set to be a ConsoleAppender.
|
||||||
|
log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||||
|
|
||||||
|
# A1 uses PatternLayout.
|
||||||
|
log4j.logger.org = ERROR
|
||||||
|
log4j.logger.eu.dnetlib = DEBUG
|
||||||
|
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
|
@ -43,6 +43,7 @@
|
||||||
<module>dhp-doiboost</module>
|
<module>dhp-doiboost</module>
|
||||||
<module>dhp-impact-indicators</module>
|
<module>dhp-impact-indicators</module>
|
||||||
<module>dhp-swh</module>
|
<module>dhp-swh</module>
|
||||||
|
<module>dhp-scholix-provision</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<pluginRepositories>
|
<pluginRepositories>
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -960,7 +960,7 @@
|
||||||
<commons.logging.version>1.1.3</commons.logging.version>
|
<commons.logging.version>1.1.3</commons.logging.version>
|
||||||
<commons-validator.version>1.7</commons-validator.version>
|
<commons-validator.version>1.7</commons-validator.version>
|
||||||
<dateparser.version>1.0.7</dateparser.version>
|
<dateparser.version>1.0.7</dateparser.version>
|
||||||
<dhp-schemas.version>[6.1.2]</dhp-schemas.version>
|
<dhp-schemas.version>[6.1.3-FLAT-SCHOLIX]</dhp-schemas.version>
|
||||||
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
||||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||||
|
|
Loading…
Reference in New Issue