Compare commits
4 Commits
main
...
scholix_fl
Author | SHA1 | Date |
---|---|---|
Sandro La Bruzzo | 4f07dba68f | |
Sandro La Bruzzo | ab1842e5dc | |
Sandro La Bruzzo | ef82b8362d | |
Sandro La Bruzzo | 20c395fd86 |
|
@ -1,14 +1,23 @@
|
||||||
package eu.dnetlib.dhp.sx.graph.scholix
|
package eu.dnetlib.dhp.sx.graph.scholix
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{
|
||||||
|
Dataset,
|
||||||
|
OtherResearchProduct,
|
||||||
|
Publication,
|
||||||
|
Relation,
|
||||||
|
Result,
|
||||||
|
Software,
|
||||||
|
StructuredProperty
|
||||||
|
}
|
||||||
import eu.dnetlib.dhp.schema.sx.scholix._
|
import eu.dnetlib.dhp.schema.sx.scholix._
|
||||||
import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology}
|
import eu.dnetlib.dhp.schema.sx.summary.{AuthorPid, CollectedFromType, SchemeValue, ScholixSummary, Typology}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
import org.apache.spark.sql.{Encoder, Encoders}
|
import org.apache.spark.sql.{Encoder, Encoders}
|
||||||
import org.json4s
|
import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
|
|
||||||
|
@ -59,6 +68,36 @@ object ScholixUtils extends Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def flattenizeScholix(input: Scholix, json: String): ScholixFlat = {
|
||||||
|
|
||||||
|
val flat: ScholixFlat = new ScholixFlat
|
||||||
|
flat.setIdentifier(input.getIdentifier)
|
||||||
|
flat.setRelationType(input.getRelationship.getName)
|
||||||
|
if (input.getSource != null && input.getSource.getIdentifier != null) {
|
||||||
|
flat.setSourceId(input.getSource.getDnetIdentifier)
|
||||||
|
flat.setSourcePid(input.getSource.getIdentifier.asScala.map(p => p.getIdentifier).distinct.toList.asJava)
|
||||||
|
flat.setSourcePidType(input.getSource.getIdentifier.asScala.map(p => p.getSchema).distinct.toList.asJava)
|
||||||
|
flat.setSourceType(input.getSource.getObjectType)
|
||||||
|
flat.setSourceSubType(input.getSource.getObjectSubType)
|
||||||
|
} else return null
|
||||||
|
if (input.getSource.getPublisher != null)
|
||||||
|
flat.setSourcePublisher(input.getSource.getPublisher.asScala.map(p => p.getName).toList.asJava)
|
||||||
|
if (input.getTarget != null && input.getTarget.getIdentifier != null) {
|
||||||
|
flat.setTargetId(input.getTarget.getDnetIdentifier)
|
||||||
|
flat.setTargetPid(input.getTarget.getIdentifier.asScala.map(p => p.getIdentifier).distinct.toList.asJava)
|
||||||
|
flat.setTargetPidType(input.getTarget.getIdentifier.asScala.map(p => p.getSchema).distinct.toList.asJava)
|
||||||
|
flat.setTargetType(input.getTarget.getObjectType)
|
||||||
|
flat.setTargetSubType(input.getTarget.getObjectSubType)
|
||||||
|
} else return null
|
||||||
|
if (input.getTarget.getPublisher != null)
|
||||||
|
flat.setTargetPublisher(input.getTarget.getPublisher.asScala.map(p => p.getName).distinct.toList.asJava)
|
||||||
|
flat.setPublicationDate(input.getPublicationDate)
|
||||||
|
if (input.getLinkprovider != null)
|
||||||
|
flat.setLinkProviders(input.getLinkprovider.asScala.map(l => l.getName).distinct.toList.asJava)
|
||||||
|
flat.setBlob(json);
|
||||||
|
flat
|
||||||
|
}
|
||||||
|
|
||||||
def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = {
|
def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = {
|
||||||
new ScholixRelationship(rel.getInverse, rel.getSchema, rel.getName)
|
new ScholixRelationship(rel.getInverse, rel.getSchema, rel.getName)
|
||||||
|
|
||||||
|
@ -232,7 +271,16 @@ object ScholixUtils extends Serializable {
|
||||||
|
|
||||||
if (summaryObject.getAuthor != null && !summaryObject.getAuthor.isEmpty) {
|
if (summaryObject.getAuthor != null && !summaryObject.getAuthor.isEmpty) {
|
||||||
val l: List[ScholixEntityId] =
|
val l: List[ScholixEntityId] =
|
||||||
summaryObject.getAuthor.asScala.map(a => new ScholixEntityId(a, null)).toList
|
summaryObject.getAuthor.asScala
|
||||||
|
.map(a => {
|
||||||
|
if (a.getORCID != null)
|
||||||
|
new ScholixEntityId(
|
||||||
|
a.getFullname,
|
||||||
|
List(new ScholixIdentifier(a.getORCID, "ORCID", s"https://orcid.org/${a.getORCID}")).asJava
|
||||||
|
)
|
||||||
|
else new ScholixEntityId(a.getFullname, null)
|
||||||
|
})
|
||||||
|
.toList
|
||||||
if (l.nonEmpty)
|
if (l.nonEmpty)
|
||||||
r.setCreator(l.asJava)
|
r.setCreator(l.asJava)
|
||||||
}
|
}
|
||||||
|
@ -377,10 +425,13 @@ object ScholixUtils extends Serializable {
|
||||||
if (persistentIdentifiers.isEmpty)
|
if (persistentIdentifiers.isEmpty)
|
||||||
return null
|
return null
|
||||||
s.setLocalIdentifier(persistentIdentifiers.asJava)
|
s.setLocalIdentifier(persistentIdentifiers.asJava)
|
||||||
if (r.isInstanceOf[Publication])
|
r match {
|
||||||
s.setTypology(Typology.publication)
|
case _: Publication => s.setTypology(Typology.publication)
|
||||||
else
|
case _: Dataset => s.setTypology(Typology.dataset)
|
||||||
s.setTypology(Typology.dataset)
|
case _: Software => s.setTypology(Typology.software)
|
||||||
|
case _: OtherResearchProduct => s.setTypology(Typology.otherresearchproduct)
|
||||||
|
case _ =>
|
||||||
|
}
|
||||||
|
|
||||||
s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)
|
s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)
|
||||||
|
|
||||||
|
@ -393,7 +444,20 @@ object ScholixUtils extends Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (r.getAuthor != null && !r.getAuthor.isEmpty) {
|
if (r.getAuthor != null && !r.getAuthor.isEmpty) {
|
||||||
val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname).toList
|
val authors: List[AuthorPid] = r.getAuthor.asScala
|
||||||
|
.map(a => {
|
||||||
|
var ORCID: String = null
|
||||||
|
if (a.getPid != null) {
|
||||||
|
val result = a.getPid.asScala.find(p =>
|
||||||
|
p.getQualifier != null && p.getQualifier.getClassid != null && p.getQualifier.getClassid.toLowerCase
|
||||||
|
.contains("orcid")
|
||||||
|
)
|
||||||
|
if (result.isDefined)
|
||||||
|
ORCID = result.get.getValue
|
||||||
|
}
|
||||||
|
new AuthorPid(a.getFullname, ORCID)
|
||||||
|
})
|
||||||
|
.toList
|
||||||
if (authors.nonEmpty)
|
if (authors.nonEmpty)
|
||||||
s.setAuthor(authors.asJava)
|
s.setAuthor(authors.asJava)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.sx.graph.scholix;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.Scholix;
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.ScholixFlat;
|
||||||
|
|
||||||
|
public class ScholixFlatTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void flattenScholixTest() throws IOException {
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
InputStream gzipStream = new GZIPInputStream(getClass().getResourceAsStream("scholix_records.gz"));
|
||||||
|
Reader decoder = new InputStreamReader(gzipStream, "UTF-8");
|
||||||
|
BufferedReader buffered = new BufferedReader(decoder);
|
||||||
|
String line;
|
||||||
|
FileWriter myWriter = new FileWriter("/Users/sandro/Downloads/records");
|
||||||
|
while ((line = buffered.readLine()) != null) {
|
||||||
|
final Scholix s = mapper.readValue(line, Scholix.class);
|
||||||
|
final ScholixFlat flat = ScholixUtils.flattenizeScholix(s, line);
|
||||||
|
assertNotNull(s);
|
||||||
|
assertNotNull(flat);
|
||||||
|
assertEquals(s.getIdentifier(), flat.getIdentifier());
|
||||||
|
assertEquals(s.getRelationship().getName(), flat.getRelationType());
|
||||||
|
assertEquals(s.getSource().getObjectType(), flat.getSourceType());
|
||||||
|
assertEquals(s.getSource().getObjectSubType(), flat.getSourceSubType());
|
||||||
|
myWriter.write(mapper.writeValueAsString(flat));
|
||||||
|
myWriter.write("\n");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
myWriter.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
|
@ -0,0 +1,91 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.sx.graph.dump;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.*;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.api.MissingConceptDoiException;
|
||||||
|
import eu.dnetlib.dhp.common.api.ZenodoAPIClient;
|
||||||
|
|
||||||
|
public class SendToZenodoHDFS implements Serializable {
|
||||||
|
|
||||||
|
private static final String NEW = "new"; // to be used for a brand new deposition in zenodo
|
||||||
|
private static final String VERSION = "version"; // to be used to upload a new version of a published deposition
|
||||||
|
private static final String UPDATE = "update"; // to upload content to an open deposition not published
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception, MissingConceptDoiException {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SendToZenodoHDFS.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/sx/graph/dump/upload_zenodo.json")));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
|
final String hdfsNameNode = parser.get("nameNode");
|
||||||
|
final String access_token = parser.get("accessToken");
|
||||||
|
final String connection_url = parser.get("connectionUrl");
|
||||||
|
final String metadata = parser.get("metadata");
|
||||||
|
final String depositionType = parser.get("depositionType");
|
||||||
|
final String concept_rec_id = Optional
|
||||||
|
.ofNullable(parser.get("conceptRecordId"))
|
||||||
|
.orElse(null);
|
||||||
|
|
||||||
|
final String depositionId = Optional.ofNullable(parser.get("depositionId")).orElse(null);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
|
.listFiles(
|
||||||
|
new Path(hdfsPath), true);
|
||||||
|
ZenodoAPIClient zenodoApiClient = new ZenodoAPIClient(connection_url, access_token);
|
||||||
|
switch (depositionType) {
|
||||||
|
case NEW:
|
||||||
|
zenodoApiClient.newDeposition();
|
||||||
|
break;
|
||||||
|
case VERSION:
|
||||||
|
if (concept_rec_id == null) {
|
||||||
|
throw new MissingConceptDoiException("No concept record id has been provided");
|
||||||
|
}
|
||||||
|
zenodoApiClient.newVersion(concept_rec_id);
|
||||||
|
break;
|
||||||
|
case UPDATE:
|
||||||
|
if (depositionId == null) {
|
||||||
|
throw new MissingConceptDoiException("No deposition id has been provided");
|
||||||
|
}
|
||||||
|
zenodoApiClient.uploadOpenDeposition(depositionId);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new RuntimeException("No available entries");
|
||||||
|
}
|
||||||
|
|
||||||
|
while (fileStatusListIterator.hasNext()) {
|
||||||
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
|
||||||
|
Path p = fileStatus.getPath();
|
||||||
|
String pString = p.toString();
|
||||||
|
if (!pString.endsWith("_SUCCESS")) {
|
||||||
|
String name = pString.substring(pString.lastIndexOf("/") + 1);
|
||||||
|
|
||||||
|
FSDataInputStream inputStream = fileSystem.open(p);
|
||||||
|
// zenodoApiClient.uploadIS(inputStream, name);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
if (!metadata.equals("")) {
|
||||||
|
zenodoApiClient.sendMretadata(metadata);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "dt",
|
||||||
|
"paramLongName": "depositionType",
|
||||||
|
"paramDescription": "the type of the deposition (new, version, update)",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "cri",
|
||||||
|
"paramLongName": "conceptRecordId",
|
||||||
|
"paramDescription": "The id of the concept record for a new version",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "di",
|
||||||
|
"paramLongName": "depositionId",
|
||||||
|
"paramDescription": "the id of an open deposition which has not been published",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hdfsp",
|
||||||
|
"paramLongName": "hdfsPath",
|
||||||
|
"paramDescription": "the path of the folder tofind files to send to Zenodo",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "nn",
|
||||||
|
"paramLongName": "nameNode",
|
||||||
|
"paramDescription": "the name node",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "at",
|
||||||
|
"paramLongName": "accessToken",
|
||||||
|
"paramDescription": "the access token for the deposition",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "cu",
|
||||||
|
"paramLongName": "connectionUrl",
|
||||||
|
"paramDescription": "the url to connect to deposit",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "m",
|
||||||
|
"paramLongName": "metadata",
|
||||||
|
"paramDescription": "metadata associated to the deposition",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -8,148 +8,36 @@
|
||||||
<name>targetPath</name>
|
<name>targetPath</name>
|
||||||
<description>the final graph path</description>
|
<description>the final graph path</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>relationFilter</name>
|
|
||||||
<description>Filter relation semantic</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>maxNumberOfPid</name>
|
|
||||||
<description>filter relation with at least #maxNumberOfPid</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>dumpCitations</name>
|
|
||||||
<value>false</value>
|
|
||||||
<description>should dump citation relations</description>
|
|
||||||
</property>
|
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="ImportDatasetEntities"/>
|
<start to="make_tar"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<action name="ImportDatasetEntities">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Import JSONRDD to Dataset kryo</name>
|
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.shuffle.partitions=3000
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
|
||||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
|
||||||
<arg>--filterRelation</arg><arg>${relationFilter}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="CreateSummaries"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
|
|
||||||
<action name="CreateSummaries">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Convert Entities to summaries</name>
|
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkCreateSummaryObject</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.shuffle.partitions=20000
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${targetPath}/entities</arg>
|
|
||||||
<arg>--targetPath</arg><arg>${targetPath}/provision/summaries</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="CreateScholix"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="CreateScholix">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Generate Scholix Dataset</name>
|
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkCreateScholix</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.shuffle.partitions=30000
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
|
||||||
<arg>--summaryPath</arg><arg>${targetPath}/provision/summaries</arg>
|
|
||||||
<arg>--targetPath</arg><arg>${targetPath}/provision/scholix</arg>
|
|
||||||
<arg>--relationPath</arg><arg>${targetPath}/relation</arg>
|
|
||||||
<arg>--dumpCitations</arg><arg>${dumpCitations}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="DropJSONPath"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="DropJSONPath">
|
|
||||||
<fs>
|
|
||||||
<delete path='${targetPath}/json'/>
|
|
||||||
<mkdir path='${targetPath}/json/'/>
|
|
||||||
</fs>
|
|
||||||
<ok to="SerializeScholix"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="SerializeScholix">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Serialize scholix to JSON</name>
|
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.shuffle.partitions=6000
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
|
||||||
<arg>--sourcePath</arg><arg>${targetPath}/provision/scholix/scholix</arg>
|
|
||||||
<arg>--targetPath</arg><arg>${targetPath}/json/scholix_json</arg>
|
|
||||||
<arg>--objectType</arg><arg>scholix</arg>
|
|
||||||
<arg>--maxPidNumberFilter</arg><arg>maxNumberOfPid</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="make_tar"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="make_tar">
|
<action name="make_tar">
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.dhp.common.MakeTarArchive</main-class>
|
<main-class>eu.dnetlib.dhp.common.MakeTarArchive</main-class>
|
||||||
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--hdfsPath</arg><arg>${targetPath}</arg>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="send_zenodo"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="send_zenodo">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.dhp.sx.graph.dump.SendToZenodoHDFS</main-class>
|
||||||
<arg>--hdfsPath</arg><arg>${targetPath}/tar</arg>
|
<arg>--hdfsPath</arg><arg>${targetPath}/tar</arg>
|
||||||
<arg>--sourcePath</arg><arg>${targetPath}/json</arg>
|
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--accessToken</arg><arg>${accessToken}</arg>
|
||||||
|
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
|
||||||
|
<arg>--metadata</arg><arg>${metadata}</arg>
|
||||||
|
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
|
||||||
|
<arg>--depositionType</arg><arg>${depositionType}</arg>
|
||||||
|
<arg>--depositionId</arg><arg>${depositionId}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class DropAndCreateESIndex {
|
||||||
Objects
|
Objects
|
||||||
.requireNonNull(
|
.requireNonNull(
|
||||||
DropAndCreateESIndex.class
|
DropAndCreateESIndex.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json")));
|
||||||
|
|
||||||
log.info("creating Index SCHOLIX");
|
log.info("creating Index SCHOLIX");
|
||||||
final HttpPut put = new HttpPut(String.format(url, ip, index, "scholix"));
|
final HttpPut put = new HttpPut(String.format(url, ip, index, "scholix"));
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.sx.provision;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.Scholix;
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.ScholixResource;
|
||||||
|
|
||||||
|
public class ScholixFlat {
|
||||||
|
private static ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
private List<String> linkProvider = new ArrayList<>();
|
||||||
|
|
||||||
|
private String publicationDate;
|
||||||
|
|
||||||
|
private List<String> sourceLinkPublisher = new ArrayList<>();
|
||||||
|
private List<String> targetLinkPublisher = new ArrayList<>();
|
||||||
|
|
||||||
|
private String sourceDnetIdentifier;
|
||||||
|
private String targetDnetIdentifier;
|
||||||
|
private List<String> sourcePids = new ArrayList<>();
|
||||||
|
private List<String> sourcePidTypes = new ArrayList<>();
|
||||||
|
private List<String> targetPids = new ArrayList<>();
|
||||||
|
private List<String> targetPidTypes = new ArrayList<>();
|
||||||
|
|
||||||
|
private String json;
|
||||||
|
|
||||||
|
public void addLinkProvider(final String providerName) {
|
||||||
|
addStringToList(providerName, this.linkProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSourceLinkPublisher(final String linkPublisher) {
|
||||||
|
addStringToList(linkPublisher, sourceLinkPublisher);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addTargetLinkPublisher(final String linkPublisher) {
|
||||||
|
addStringToList(linkPublisher, targetLinkPublisher);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSourcePid(final String pid) {
|
||||||
|
addStringToList(pid, sourcePids);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSourcePidType(final String pidType) {
|
||||||
|
addStringToList(pidType, sourcePidTypes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addTargetPidType(final String pidType) {
|
||||||
|
addStringToList(pidType, targetPidTypes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addTargetPid(final String pid) {
|
||||||
|
addStringToList(pid, targetPids);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addStringToList(final String s, final List<String> l) {
|
||||||
|
if (l != null && !l.contains(s))
|
||||||
|
l.add(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourceDnetIdentifier() {
|
||||||
|
return sourceDnetIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSourceDnetIdentifier(String sourceDnetIdentifier) {
|
||||||
|
this.sourceDnetIdentifier = sourceDnetIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetDnetIdentifier() {
|
||||||
|
return targetDnetIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetDnetIdentifier(String targetDnetIdentifier) {
|
||||||
|
this.targetDnetIdentifier = targetDnetIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getSourcePids() {
|
||||||
|
return sourcePids;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSourcePids(List<String> sourcePids) {
|
||||||
|
this.sourcePids = sourcePids;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getSourcePidTypes() {
|
||||||
|
return sourcePidTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSourcePidTypes(List<String> sourcePidTypes) {
|
||||||
|
this.sourcePidTypes = sourcePidTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getTargetPids() {
|
||||||
|
return targetPids;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetPids(List<String> targetPids) {
|
||||||
|
this.targetPids = targetPids;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getTargetPidTypes() {
|
||||||
|
return targetPidTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetPidTypes(List<String> targetPidTypes) {
|
||||||
|
this.targetPidTypes = targetPidTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getSourceLinkPublisher() {
|
||||||
|
return sourceLinkPublisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSourceLinkPublisher(List<String> sourceLinkPublisher) {
|
||||||
|
this.sourceLinkPublisher = sourceLinkPublisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getTargetLinkPublisher() {
|
||||||
|
return targetLinkPublisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetLinkPublisher(List<String> targetLinkPublisher) {
|
||||||
|
this.targetLinkPublisher = targetLinkPublisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getLinkProvider() {
|
||||||
|
return linkProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLinkProvider(List<String> linkProvider) {
|
||||||
|
this.linkProvider = linkProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPublicationDate() {
|
||||||
|
return publicationDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPublicationDate(String publicationDate) {
|
||||||
|
this.publicationDate = publicationDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJson() {
|
||||||
|
return json;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJson(String json) {
|
||||||
|
this.json = json;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ScholixFlat fromScholix(final Scholix scholix) throws JsonProcessingException {
|
||||||
|
if (scholix == null || scholix.getSource() == null || scholix.getTarget() == null)
|
||||||
|
return null;
|
||||||
|
final ScholixFlat flat = new ScholixFlat();
|
||||||
|
if (scholix.getLinkprovider() != null)
|
||||||
|
scholix.getLinkprovider().forEach(l -> flat.addLinkProvider(l.getName()));
|
||||||
|
|
||||||
|
flat.setPublicationDate(scholix.getPublicationDate());
|
||||||
|
|
||||||
|
final ScholixResource source = scholix.getSource();
|
||||||
|
flat.setSourceDnetIdentifier(source.getDnetIdentifier());
|
||||||
|
if (source.getIdentifier() != null) {
|
||||||
|
source.getIdentifier().forEach(i -> {
|
||||||
|
flat.addSourcePid(i.getIdentifier());
|
||||||
|
flat.addSourcePidType(i.getSchema());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (source.getPublisher() != null) {
|
||||||
|
source.getPublisher().forEach(p -> flat.addSourceLinkPublisher(p.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
final ScholixResource target = scholix.getSource();
|
||||||
|
flat.setTargetDnetIdentifier(target.getDnetIdentifier());
|
||||||
|
if (target.getIdentifier() != null) {
|
||||||
|
target.getIdentifier().forEach(i -> {
|
||||||
|
flat.addTargetPid(i.getIdentifier());
|
||||||
|
flat.addTargetPidType(i.getSchema());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (target.getPublisher() != null) {
|
||||||
|
target.getPublisher().forEach(p -> flat.addTargetLinkPublisher(p.getName()));
|
||||||
|
}
|
||||||
|
flat.setJson(MAPPER.writeValueAsString(scholix));
|
||||||
|
return flat;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -42,7 +42,8 @@ public class SparkIndexCollectionOnES {
|
||||||
.toString(
|
.toString(
|
||||||
Objects
|
Objects
|
||||||
.requireNonNull(
|
.requireNonNull(
|
||||||
DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json")));
|
SparkIndexCollectionOnES.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json")));
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
|
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="DropAndCreateIndex"/>
|
<start to="indexScholix"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
{
|
||||||
|
"mappings": {
|
||||||
|
"properties": {
|
||||||
|
"blob": {
|
||||||
|
"type": "binary",
|
||||||
|
"index": false
|
||||||
|
},
|
||||||
|
"identifier": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"linkProviders": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"publicationDate": {
|
||||||
|
"type": "date"
|
||||||
|
},
|
||||||
|
"relationType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourceId": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourcePid": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourcePidType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourcePublisher": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourceSubType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"sourceType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetId": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetPid": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetPidType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetPublisher": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetSubType": {
|
||||||
|
"type": "keyword"
|
||||||
|
},
|
||||||
|
"targetType": {
|
||||||
|
"type": "keyword"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"settings": {
|
||||||
|
"index": {
|
||||||
|
"refresh_interval": "600s",
|
||||||
|
"number_of_shards": "48",
|
||||||
|
"translog": {
|
||||||
|
"sync_interval": "15s",
|
||||||
|
"durability": "ASYNC"
|
||||||
|
},
|
||||||
|
"analysis": {
|
||||||
|
"analyzer": {
|
||||||
|
"analyzer_keyword": {
|
||||||
|
"filter": "lowercase",
|
||||||
|
"tokenizer": "keyword"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"number_of_replicas": "0"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,146 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.sx;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex.APPLICATION_JSON;
|
||||||
|
import static eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex.STATUS_CODE_TEXT;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.Base64;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
import java.util.zip.GZIPOutputStream;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Base64InputStream;
|
||||||
|
import org.apache.commons.codec.binary.Base64OutputStream;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpDelete;
|
||||||
|
import org.apache.http.client.methods.HttpPut;
|
||||||
|
import org.apache.http.entity.StringEntity;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.Scholix;
|
||||||
|
import eu.dnetlib.dhp.schema.sx.scholix.ScholixFlat;
|
||||||
|
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils;
|
||||||
|
import eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex;
|
||||||
|
import eu.dnetlib.dhp.sx.provision.SparkIndexCollectionOnES;
|
||||||
|
|
||||||
|
public class FlatIndexTest {
|
||||||
|
@Test
|
||||||
|
public void dropAndCreateIndex() {
|
||||||
|
|
||||||
|
Logger log = LoggerFactory.getLogger(getClass().getName());
|
||||||
|
|
||||||
|
final String url = "http://localhost:9200/dli_scholix";
|
||||||
|
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
|
||||||
|
HttpDelete delete = new HttpDelete(url);
|
||||||
|
|
||||||
|
CloseableHttpResponse response = client.execute(delete);
|
||||||
|
|
||||||
|
log.info("deleting Index SCHOLIX");
|
||||||
|
log.info(STATUS_CODE_TEXT, response.getStatusLine());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
|
||||||
|
final String scholixConf = IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
DropAndCreateESIndex.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index_flat.json")));
|
||||||
|
|
||||||
|
log.info("creating Index SCHOLIX");
|
||||||
|
final HttpPut put = new HttpPut(url);
|
||||||
|
|
||||||
|
final StringEntity entity = new StringEntity(scholixConf);
|
||||||
|
put.setEntity(entity);
|
||||||
|
put.setHeader("Accept", APPLICATION_JSON);
|
||||||
|
put.setHeader("Content-type", APPLICATION_JSON);
|
||||||
|
|
||||||
|
final CloseableHttpResponse response = client.execute(put);
|
||||||
|
log.info(STATUS_CODE_TEXT, response.getStatusLine());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String compressString(final String input) {
|
||||||
|
try {
|
||||||
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||||
|
Base64OutputStream b64os = new Base64OutputStream(os);
|
||||||
|
GZIPOutputStream gzip = new GZIPOutputStream(b64os);
|
||||||
|
gzip.write(input.getBytes(StandardCharsets.UTF_8));
|
||||||
|
gzip.close();
|
||||||
|
b64os.close();
|
||||||
|
return new String(os.toByteArray(), StandardCharsets.UTF_8);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
t.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String uncompress(final String compressed) throws Exception {
|
||||||
|
Base64InputStream bis = new Base64InputStream(new ByteArrayInputStream(compressed.getBytes()));
|
||||||
|
GZIPInputStream gzip = new GZIPInputStream(bis);
|
||||||
|
return IOUtils.toString(gzip);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFeedIndex() throws Exception {
|
||||||
|
|
||||||
|
InputStream gzipStream = new GZIPInputStream(
|
||||||
|
getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_dump.gz"));
|
||||||
|
Reader decoder = new InputStreamReader(gzipStream, "UTF-8");
|
||||||
|
BufferedReader buffered = new BufferedReader(decoder);
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
GZIPOutputStream gzip = new GZIPOutputStream(Files.newOutputStream(Paths.get("/tmp/scholix_flat.gz")));
|
||||||
|
String line = buffered.readLine();
|
||||||
|
while (line != null) {
|
||||||
|
|
||||||
|
final Scholix s = mapper.readValue(line, Scholix.class);
|
||||||
|
final ScholixFlat flat = ScholixUtils.flattenizeScholix(s, compressString(line));
|
||||||
|
gzip.write(mapper.writeValueAsString(flat).concat("\n").getBytes(StandardCharsets.UTF_8));
|
||||||
|
line = buffered.readLine();
|
||||||
|
}
|
||||||
|
gzip.close();
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf()
|
||||||
|
.setAppName(SparkIndexCollectionOnES.class.getSimpleName())
|
||||||
|
.setMaster("local[*]");
|
||||||
|
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
|
||||||
|
|
||||||
|
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
||||||
|
|
||||||
|
JavaRDD<String> inputRdd = sc.textFile("/tmp/scholix_flat.gz");
|
||||||
|
|
||||||
|
Map<String, String> esCfg = new HashMap<>();
|
||||||
|
esCfg.put("es.nodes", "localhost");
|
||||||
|
esCfg.put("es.mapping.id", "identifier");
|
||||||
|
esCfg.put("es.batch.write.retry.count", "8");
|
||||||
|
esCfg.put("es.batch.write.retry.wait", "60s");
|
||||||
|
esCfg.put("es.batch.size.entries", "200");
|
||||||
|
esCfg.put("es.nodes.wan.only", "true");
|
||||||
|
JavaEsSpark.saveJsonToEs(inputRdd, "scholix", esCfg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
|
@ -0,0 +1,46 @@
|
||||||
|
package eu.dnetlib.dhp.sx.provision
|
||||||
|
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
import org.junit.Before
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.{Test}
|
||||||
|
|
||||||
|
class ScholixFlatTest {
|
||||||
|
|
||||||
|
var spark: SparkSession = null
|
||||||
|
|
||||||
|
def initSpark(): Unit = {
|
||||||
|
|
||||||
|
if (spark != null)
|
||||||
|
return
|
||||||
|
println("SONO QUI")
|
||||||
|
val conf = new SparkConf
|
||||||
|
conf.setAppName(getClass.getSimpleName)
|
||||||
|
conf.setMaster("local[*]")
|
||||||
|
conf.set("spark.driver.host", "localhost")
|
||||||
|
conf.set("hive.metastore.local", "true")
|
||||||
|
conf.set("spark.ui.enabled", "false")
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate()
|
||||||
|
}
|
||||||
|
|
||||||
|
def after(): Unit = {
|
||||||
|
spark.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testScholixConversion(): Unit = {
|
||||||
|
initSpark()
|
||||||
|
val p = getClass.getResource("/eu/dnetlib/dhp/sx/provision/scholix_dump.zip").getPath
|
||||||
|
|
||||||
|
val t = spark.read.text(p).count
|
||||||
|
println(s"total =$t")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
2
pom.xml
2
pom.xml
|
@ -807,7 +807,7 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
<dhp-schemas.version>[3.16.0]</dhp-schemas.version>
|
<dhp-schemas.version>[3.16.1-SNAPSHOT]</dhp-schemas.version>
|
||||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||||
|
|
Loading…
Reference in New Issue