merged from master
commit
3f34757c63
@ -0,0 +1,206 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.Option;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* Groups the graph content by entity identifier to ensure ID uniqueness
|
||||
*/
|
||||
public class GroupEntitiesAndRelationsSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class);
|
||||
|
||||
private final static String ID_JPATH = "$.id";
|
||||
|
||||
private final static String SOURCE_JPATH = "$.source";
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
GroupEntitiesAndRelationsSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String graphInputPath = parser.get("graphInputPath");
|
||||
log.info("graphInputPath: {}", graphInputPath);
|
||||
|
||||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||
groupEntitiesAndRelations(spark, graphInputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void groupEntitiesAndRelations(
|
||||
SparkSession spark,
|
||||
String inputPath,
|
||||
String outputPath) {
|
||||
|
||||
TypedColumn<Oaf, Oaf> aggregator = new GroupingAggregator().toColumn();
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
spark
|
||||
.read()
|
||||
.textFile(toSeq(listPaths(inputPath, sc)))
|
||||
.map((MapFunction<String, Oaf>) s -> parseOaf(s), Encoders.kryo(Oaf.class))
|
||||
.filter((FilterFunction<Oaf>) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf)))
|
||||
.groupByKey((MapFunction<Oaf, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
|
||||
.agg(aggregator)
|
||||
.map(
|
||||
(MapFunction<Tuple2<String, Oaf>, String>) t -> t._2().getClass().getName() +
|
||||
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
|
||||
Encoders.STRING())
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.text(outputPath);
|
||||
}
|
||||
|
||||
public static class GroupingAggregator extends Aggregator<Oaf, Oaf, Oaf> {
|
||||
|
||||
@Override
|
||||
public Oaf zero() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Oaf reduce(Oaf b, Oaf a) {
|
||||
return mergeAndGet(b, a);
|
||||
}
|
||||
|
||||
private Oaf mergeAndGet(Oaf b, Oaf a) {
|
||||
if (Objects.nonNull(a) && Objects.nonNull(b)) {
|
||||
return OafMapperUtils.merge(b, a);
|
||||
}
|
||||
return Objects.isNull(a) ? b : a;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Oaf merge(Oaf b, Oaf a) {
|
||||
return mergeAndGet(b, a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Oaf finish(Oaf j) {
|
||||
return j;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<Oaf> bufferEncoder() {
|
||||
return Encoders.kryo(Oaf.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<Oaf> outputEncoder() {
|
||||
return Encoders.kryo(Oaf.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static Oaf parseOaf(String s) {
|
||||
|
||||
DocumentContext dc = JsonPath
|
||||
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
|
||||
final String id = dc.read(ID_JPATH);
|
||||
if (StringUtils.isNotBlank(id)) {
|
||||
|
||||
String prefix = StringUtils.substringBefore(id, "|");
|
||||
switch (prefix) {
|
||||
case "10":
|
||||
return parse(s, Datasource.class);
|
||||
case "20":
|
||||
return parse(s, Organization.class);
|
||||
case "40":
|
||||
return parse(s, Project.class);
|
||||
case "50":
|
||||
String resultType = dc.read("$.resulttype.classid");
|
||||
switch (resultType) {
|
||||
case "publication":
|
||||
return parse(s, Publication.class);
|
||||
case "dataset":
|
||||
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||
case "software":
|
||||
return parse(s, Software.class);
|
||||
case "other":
|
||||
return parse(s, OtherResearchProduct.class);
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
|
||||
}
|
||||
} else {
|
||||
String source = dc.read(SOURCE_JPATH);
|
||||
if (StringUtils.isNotBlank(source)) {
|
||||
return parse(s, Relation.class);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends Oaf> Oaf parse(String s, Class<T> clazz) {
|
||||
try {
|
||||
return OBJECT_MAPPER.readValue(s, clazz);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<String> listPaths(String inputPath, JavaSparkContext sc) {
|
||||
return HdfsSupport
|
||||
.listFiles(inputPath, sc.hadoopConfiguration())
|
||||
.stream()
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "gin",
|
||||
"paramLongName": "graphInputPath",
|
||||
"paramDescription": "the graph root path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the output merged graph root path",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
@ -0,0 +1,113 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
|
||||
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<oai:header xmlns="http://namespace.openaire.eu/"
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
|
||||
<dri:objIdentifier>r3f52792889d::000051aa1f61d77d2c0b340091f8024e</dri:objIdentifier>
|
||||
<dri:recordIdentifier>textgrid:q9cv.0</dri:recordIdentifier>
|
||||
<dri:dateOfCollection>2020-11-17T09:34:11.128+01:00</dri:dateOfCollection>
|
||||
<oaf:datasourceprefix>r3f52792889d</oaf:datasourceprefix>
|
||||
<identifier xmlns="http://www.openarchives.org/OAI/2.0/">textgrid:q9cv.0</identifier>
|
||||
<datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2012-01-21T13:35:20Z</datestamp>
|
||||
<dr:dateOfTransformation>2020-11-17T09:46:21.551+01:00</dr:dateOfTransformation>
|
||||
</oai:header>
|
||||
<metadata>
|
||||
<datacite:resource xmlns="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:datacite="http://datacite.org/schema/kernel-3"
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
|
||||
<datacite:identifier identifierType="Handle">hdl:11858/00-1734-0000-0003-7664-F</datacite:identifier>
|
||||
<datacite:creators>
|
||||
<datacite:creator>
|
||||
<datacite:creatorName>Hoffmann von Fallersleben, August Heinrich</datacite:creatorName>
|
||||
<datacite:nameIdentifier nameIdentifierScheme="pnd" schemeURI="https://de.dariah.eu/pnd-service">118552589</datacite:nameIdentifier>
|
||||
</datacite:creator>
|
||||
</datacite:creators>
|
||||
<datacite:titles>
|
||||
<datacite:title titleType="Other">Mailied</datacite:title>
|
||||
<datacite:title titleType="Other">August Heinrich Hoffmann von Fallersleben: Unpolitische Lieder von Hoffmann von Fallersleben, 1. + 2. Theil, 1. Theil, Hamburg: Hoffmann und Campe, 1841.</datacite:title>
|
||||
</datacite:titles>
|
||||
<datacite:publisher>TextGrid</datacite:publisher>
|
||||
<datacite:publicationYear>2012</datacite:publicationYear>
|
||||
<datacite:contributors>
|
||||
<datacite:contributor contributorType="DataManager">
|
||||
<datacite:contributorName>tvitt@textgrid.de</datacite:contributorName>
|
||||
</datacite:contributor>
|
||||
<datacite:contributor contributorType="Other">
|
||||
<datacite:contributorName>Digitale Bibliothek</datacite:contributorName>
|
||||
<datacite:nameIdentifier nameIdentifierScheme="textgrid">TGPR-372fe6dc-57f2-6cd4-01b5-2c4bbefcfd3c</datacite:nameIdentifier>
|
||||
</datacite:contributor>
|
||||
</datacite:contributors>
|
||||
<datacite:dates>
|
||||
<datacite:date dateType="Created">2012-01-21T13:35:20Z</datacite:date>
|
||||
<datacite:date dateType="Issued">2012-01-21T13:35:20Z</datacite:date>
|
||||
<datacite:date dateType="Updated">2012-01-21T13:35:20Z</datacite:date>
|
||||
</datacite:dates>
|
||||
<datacite:resourceType resourceTypeGeneral="Dataset"/>
|
||||
<alternateIdentifiers>
|
||||
<datacite:alternateIdentifier alternateIdentifierType="URI">textgrid:q9cv.0</datacite:alternateIdentifier>
|
||||
<alternateIdentifier alternateIdentifierType="URL">http://hdl.handle.net/hdl:11858/00-1734-0000-0003-7664-F</alternateIdentifier>
|
||||
</alternateIdentifiers>
|
||||
<datacite:relatedIdentifiers>
|
||||
<datacite:relatedIdentifier relatedIdentifierType="Handle" relationType="IsPartOf">hdl:11858/00-1734-0000-0003-7666-B</datacite:relatedIdentifier>
|
||||
</datacite:relatedIdentifiers>
|
||||
<datacite:sizes>
|
||||
<datacite:size>527 Bytes</datacite:size>
|
||||
</datacite:sizes>
|
||||
<datacite:formats>
|
||||
<datacite:format>text/tg.edition+tg.aggregation+xml</datacite:format>
|
||||
</datacite:formats>
|
||||
<datacite:version>0</datacite:version>
|
||||
<datacite:rightsList>
|
||||
<datacite:rights rightsURI="http://creativecommons.org/licenses/by/3.0/de/legalcode"> Der annotierte Datenbestand der Digitalen Bibliothek inklusive
|
||||
Metadaten sowie davon einzeln zugängliche Teile sind eine Abwandlung
|
||||
des Datenbestandes von www.editura.de durch TextGrid und werden
|
||||
unter der Lizenz Creative Commons Namensnennung 3.0 Deutschland
|
||||
Lizenz (by-Nennung TextGrid) veröffentlicht. Die Lizenz bezieht sich
|
||||
nicht auf die der Annotation zu Grunde liegenden allgemeinfreien
|
||||
Texte (Siehe auch Punkt 2 der Lizenzbestimmungen).</datacite:rights>
|
||||
<datacite:rights rightsURI="info:eu-repo/semantics/openAccess"/>
|
||||
</datacite:rightsList>
|
||||
<datacite:descriptions>
|
||||
<datacite:description descriptionType="Abstract"/>
|
||||
</datacite:descriptions>
|
||||
<datacite:geoLocations>
|
||||
<datacite:geoLocation>
|
||||
<datacite:geoLocationPlace
|
||||
xmlns:xs="http://www.w3.org/2001/XMLSchema" xsi:type="xs:string">Hamburg</datacite:geoLocationPlace>
|
||||
</datacite:geoLocation>
|
||||
</datacite:geoLocations>
|
||||
</datacite:resource>
|
||||
<oaf:identifier identifierType="handle">hdl:11858/00-1734-0000-0003-7664-F</oaf:identifier>
|
||||
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
|
||||
<oaf:refereed>0002</oaf:refereed>
|
||||
<oaf:dateAccepted>2012-01-01</oaf:dateAccepted>
|
||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||
<oaf:license>http://creativecommons.org/licenses/by/3.0/de/legalcode</oaf:license>
|
||||
<oaf:language>und</oaf:language>
|
||||
<oaf:hostedBy id="re3data_____::r3d100011365" name="TextGrid Repository"/>
|
||||
<oaf:collectedFrom id="re3data_____::r3d100011365" name="TextGrid Repository"/>
|
||||
</metadata>
|
||||
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
|
||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||
<originDescription altered="true" harvestDate="2020-11-17T09:34:11.128+01:00">
|
||||
<baseURL>https%3A%2F%2Fdev.textgridlab.org%2F1.0%2Ftgoaipmh%2Foai</baseURL>
|
||||
<identifier>textgrid:q9cv.0</identifier>
|
||||
<datestamp>2012-01-21T13:35:20Z</datestamp>
|
||||
<metadataNamespace>http://schema.datacite.org/oai/oai-1.0/</metadataNamespace>
|
||||
</originDescription>
|
||||
</provenance>
|
||||
<oaf:datainfo>
|
||||
<oaf:inferred>false</oaf:inferred>
|
||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||
<oaf:trust>0.9</oaf:trust>
|
||||
<oaf:inferenceprovenance/>
|
||||
<oaf:provenanceaction classid="sysimport:crosswalk"
|
||||
classname="sysimport:crosswalk"
|
||||
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||
</oaf:datainfo>
|
||||
</about>
|
||||
</record>
|
@ -0,0 +1,14 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision;
|
||||
|
||||
public class ProvisionConstants {
|
||||
|
||||
public static final String LAYOUT = "index";
|
||||
public static final String INTERPRETATION = "openaire";
|
||||
public static final String SEPARATOR = "-";
|
||||
|
||||
public static String getCollectionName(String format) {
|
||||
return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
|
||||
}
|
||||
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
public abstract class SolrApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SolrApplication.class);
|
||||
|
||||
protected static final String LAYOUT = "index";
|
||||
protected static final String INTERPRETATION = "openaire";
|
||||
protected static final String SEPARATOR = "-";
|
||||
protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
|
||||
|
||||
/**
|
||||
* Method retrieves from the information system the zookeeper quorum of the Solr server
|
||||
*
|
||||
* @param isLookup
|
||||
* @return the zookeeper quorum of the Solr server
|
||||
* @throws ISLookUpException
|
||||
*/
|
||||
protected static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
|
||||
return doLookup(
|
||||
isLookup,
|
||||
"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
|
||||
}
|
||||
|
||||
protected static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
|
||||
log.info(String.format("running xquery: %s", xquery));
|
||||
final String res = isLookup.getResourceProfileByQuery(xquery);
|
||||
log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision.model;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
|
||||
/**
|
||||
* Wrapper class needed to make the SolrInputDocument compatible with the Kryo serialization mechanism.
|
||||
*/
|
||||
public class SerializableSolrInputDocument extends SolrInputDocument {
|
||||
|
||||
public SerializableSolrInputDocument() {
|
||||
super(new HashMap<>());
|
||||
}
|
||||
|
||||
public SerializableSolrInputDocument(Map<String, SolrInputField> fields) {
|
||||
super(fields);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision.utils;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.oa.provision.ProvisionConstants;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
public class ISLookupClient {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ISLookupClient.class);
|
||||
|
||||
private ISLookUpService isLookup;
|
||||
|
||||
public ISLookupClient(ISLookUpService isLookup) {
|
||||
this.isLookup = isLookup;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method retrieves from the information system the list of fields associated to the given MDFormat name
|
||||
*
|
||||
* @param format the Metadata format name
|
||||
* @return the string representation of the list of fields to be indexed
|
||||
* @throws ISLookUpDocumentNotFoundException
|
||||
* @throws ISLookUpException
|
||||
*/
|
||||
public String getLayoutSource(final String format)
|
||||
throws ISLookUpDocumentNotFoundException, ISLookUpException {
|
||||
return doLookup(
|
||||
String
|
||||
.format(
|
||||
"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']",
|
||||
format, ProvisionConstants.LAYOUT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Method retrieves from the information system the openaireLayoutToRecordStylesheet
|
||||
*
|
||||
* @return the string representation of the XSLT contained in the transformation rule profile
|
||||
* @throws ISLookUpDocumentNotFoundException
|
||||
* @throws ISLookUpException
|
||||
*/
|
||||
public String getLayoutTransformer() throws ISLookUpException {
|
||||
return doLookup(
|
||||
"collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')"
|
||||
+ "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
|
||||
}
|
||||
|
||||
/**
|
||||
* Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
|
||||
*
|
||||
* @param format
|
||||
* @return the IndexDS identifier
|
||||
* @throws ISLookUpException
|
||||
*/
|
||||
public String getDsId(String format) throws ISLookUpException {
|
||||
return doLookup(
|
||||
String
|
||||
.format(
|
||||
"collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')"
|
||||
+ "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()",
|
||||
format));
|
||||
}
|
||||
|
||||
/**
|
||||
* Method retrieves from the information system the zookeeper quorum of the Solr server
|
||||
*
|
||||
* @return the zookeeper quorum of the Solr server
|
||||
* @throws ISLookUpException
|
||||
*/
|
||||
public String getZkHost() throws ISLookUpException {
|
||||
return doLookup(
|
||||
"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
|
||||
}
|
||||
|
||||
private String doLookup(String xquery) throws ISLookUpException {
|
||||
log.info(String.format("running xquery: %s", xquery));
|
||||
final String res = getIsLookup().getResourceProfileByQuery(xquery);
|
||||
log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
|
||||
return res;
|
||||
}
|
||||
|
||||
public ISLookUpService getIsLookup() {
|
||||
return isLookup;
|
||||
}
|
||||
|
||||
public void setIsLookup(ISLookUpService isLookup) {
|
||||
this.isLookup = isLookup;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,109 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.solr.client.solrj.embedded.JettyConfig;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class SolrTest {
|
||||
|
||||
protected static final Logger log = LoggerFactory.getLogger(SolrTest.class);
|
||||
|
||||
protected static final String FORMAT = "test";
|
||||
protected static final String DEFAULT_COLLECTION = FORMAT + "-index-openaire";
|
||||
protected static final String CONFIG_NAME = "testConfig";
|
||||
|
||||
protected static MiniSolrCloudCluster miniCluster;
|
||||
|
||||
@TempDir
|
||||
public static Path workingDir;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() throws Exception {
|
||||
|
||||
// random unassigned HTTP port
|
||||
final int jettyPort = 0;
|
||||
final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build();
|
||||
|
||||
log.info(String.format("working directory: %s", workingDir.toString()));
|
||||
System.setProperty("solr.log.dir", workingDir.resolve("logs").toString());
|
||||
|
||||
// create a MiniSolrCloudCluster instance
|
||||
miniCluster = new MiniSolrCloudCluster(2, workingDir.resolve("solr"), jettyConfig);
|
||||
|
||||
// Upload Solr configuration directory to ZooKeeper
|
||||
String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig";
|
||||
File configDir = new File(solrZKConfigDir);
|
||||
|
||||
miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME);
|
||||
|
||||
// override settings in the solrconfig include
|
||||
System.setProperty("solr.tests.maxBufferedDocs", "100000");
|
||||
System.setProperty("solr.tests.maxIndexingThreads", "-1");
|
||||
System.setProperty("solr.tests.ramBufferSizeMB", "100");
|
||||
|
||||
// use non-test classes so RandomizedRunner isn't necessary
|
||||
System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
|
||||
System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory");
|
||||
System.setProperty("solr.lock.type", "single");
|
||||
|
||||
log.info(new ConfigSetAdminRequest.List().process(miniCluster.getSolrClient()).toString());
|
||||
log
|
||||
.info(
|
||||
CollectionAdminRequest.ClusterStatus
|
||||
.getClusterStatus()
|
||||
.process(miniCluster.getSolrClient())
|
||||
.toString());
|
||||
|
||||
NamedList<Object> res = createCollection(
|
||||
miniCluster.getSolrClient(), DEFAULT_COLLECTION, 4, 2, 20, CONFIG_NAME);
|
||||
res.forEach(o -> log.info(o.toString()));
|
||||
|
||||
miniCluster.getSolrClient().setDefaultCollection(DEFAULT_COLLECTION);
|
||||
|
||||
log
|
||||
.info(
|
||||
CollectionAdminRequest.ClusterStatus
|
||||
.getClusterStatus()
|
||||
.process(miniCluster.getSolrClient())
|
||||
.toString());
|
||||
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void shutDown() throws Exception {
|
||||
miniCluster.shutdown();
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
}
|
||||
|
||||
protected static NamedList<Object> createCollection(CloudSolrClient client, String name, int numShards,
|
||||
int replicationFactor, int maxShardsPerNode, String configName) throws Exception {
|
||||
ModifiableSolrParams modParams = new ModifiableSolrParams();
|
||||
modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name());
|
||||
modParams.set("name", name);
|
||||
modParams.set("numShards", numShards);
|
||||
modParams.set("replicationFactor", replicationFactor);
|
||||
modParams.set("collection.configName", configName);
|
||||
modParams.set("maxShardsPerNode", maxShardsPerNode);
|
||||
QueryRequest request = new QueryRequest(modParams);
|
||||
request.setPath("/admin/collections");
|
||||
return client.request(request);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class XmlIndexingJobTest extends SolrTest {
|
||||
|
||||
protected static SparkSession spark;
|
||||
|
||||
private static final Integer batchSize = 100;
|
||||
|
||||
@Mock
|
||||
private ISLookUpService isLookUpService;
|
||||
|
||||
@Mock
|
||||
private ISLookupClient isLookupClient;
|
||||
|
||||
@BeforeEach
|
||||
public void prepareMocks() throws ISLookUpException, IOException {
|
||||
isLookupClient.setIsLookup(isLookUpService);
|
||||
|
||||
int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort();
|
||||
|
||||
Mockito
|
||||
.when(isLookupClient.getDsId(Mockito.anyString()))
|
||||
.thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl");
|
||||
Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort));
|
||||
Mockito
|
||||
.when(isLookupClient.getLayoutSource(Mockito.anyString()))
|
||||
.thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml")));
|
||||
Mockito
|
||||
.when(isLookupClient.getLayoutTransformer())
|
||||
.thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl")));
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
public static void before() {
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(XmlIndexingJobTest.class.getSimpleName());
|
||||
conf.registerKryoClasses(new Class[] {
|
||||
SerializableSolrInputDocument.class
|
||||
});
|
||||
|
||||
conf.setMaster("local[1]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(XmlIndexingJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void tearDown() {
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXmlIndexingJob_onSolr() throws Exception {
|
||||
|
||||
String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml";
|
||||
|
||||
long nRecord = JavaSparkContext
|
||||
.fromSparkContext(spark.sparkContext())
|
||||
.sequenceFile(inputPath, Text.class, Text.class)
|
||||
.count();
|
||||
|
||||
new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, null).run(isLookupClient);
|
||||
|
||||
Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus());
|
||||
|
||||
QueryResponse rsp = miniCluster.getSolrClient().query(new SolrQuery().add(CommonParams.Q, "*:*"));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
nRecord, rsp.getResults().getNumFound(),
|
||||
"the number of indexed records should be equal to the number of input records");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXmlIndexingJob_saveOnHDFS() throws Exception {
|
||||
final String ID_XPATH = "//header/*[local-name()='objIdentifier']";
|
||||
|
||||
String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml";
|
||||
|
||||
final JavaPairRDD<Text, Text> xmlRecords = JavaSparkContext
|
||||
.fromSparkContext(spark.sparkContext())
|
||||
.sequenceFile(inputPath, Text.class, Text.class);
|
||||
long nRecord = xmlRecords.count();
|
||||
long xmlIdUnique = xmlRecords
|
||||
.map(t -> t._2().toString())
|
||||
.map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH))
|
||||
.distinct()
|
||||
.count();
|
||||
Assertions.assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records");
|
||||
|
||||
final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString();
|
||||
new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, outputPath).run(isLookupClient);
|
||||
|
||||
final Dataset<SerializableSolrInputDocument> solrDocs = spark
|
||||
.read()
|
||||
.load(outputPath)
|
||||
.as(Encoders.kryo(SerializableSolrInputDocument.class));
|
||||
long docIdUnique = solrDocs.map((MapFunction<SerializableSolrInputDocument, String>) doc -> {
|
||||
final SolrInputField id = doc.getField("__indexrecordidentifier");
|
||||
return id.getFirstValue().toString();
|
||||
}, Encoders.STRING())
|
||||
.distinct()
|
||||
.count();
|
||||
Assertions.assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output records");
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- If this file is found in the config directory, it will only be
|
||||
loaded once at startup. If it is found in Solr's data
|
||||
directory, it will be re-loaded every commit.
|
||||
|
||||
See http://wiki.apache.org/solr/QueryElevationComponent for more info
|
||||
|
||||
-->
|
||||
<elevate>
|
||||
<!-- Query elevation examples
|
||||
<query text="foo bar">
|
||||
<doc id="1" />
|
||||
<doc id="2" />
|
||||
<doc id="3" />
|
||||
</query>
|
||||
|
||||
for use with techproducts example
|
||||
|
||||
<query text="ipod">
|
||||
<doc id="MA147LL/A" /> put the actual ipod at the top
|
||||
<doc id="IW-02" exclude="true" /> exclude this cable
|
||||
</query>
|
||||
-->
|
||||
|
||||
</elevate>
|
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Loading…
Reference in New Issue