improved parallelization on transformation job

This commit is contained in:
Sandro La Bruzzo 2021-04-19 15:14:52 +02:00
parent 3ae67b7a1d
commit cdfe01bbae
7 changed files with 98 additions and 89 deletions

View File

@ -11,6 +11,9 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
@ -26,6 +29,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.message.MessageSender;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -33,6 +37,8 @@ public class TransformSparkJobNode {
private static final Logger log = LoggerFactory.getLogger(TransformSparkJobNode.class);
private static int RECORDS_PER_TASK = 200;
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -67,6 +73,11 @@ public class TransformSparkJobNode {
final String dateOfTransformation = parser.get("dateOfTransformation");
log.info(String.format("dateOfTransformation: %s", dateOfTransformation));
final Integer rpt = Optional
.ofNullable(parser.get("recordsPerTask"))
.map(Integer::valueOf)
.orElse(RECORDS_PER_TASK);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
@ -79,12 +90,12 @@ public class TransformSparkJobNode {
isSparkSessionManaged,
spark -> {
transformRecords(
parser.getObjectMap(), isLookupService, spark, inputPath, outputBasePath);
parser.getObjectMap(), isLookupService, spark, inputPath, outputBasePath, rpt);
});
}
public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService,
final SparkSession spark, final String inputPath, final String outputBasePath)
final SparkSession spark, final String inputPath, final String outputBasePath, final Integer rpt)
throws DnetTransformationException, IOException {
final LongAccumulator totalItems = spark.sparkContext().longAccumulator(CONTENT_TOTALITEMS);
@ -99,18 +110,25 @@ public class TransformSparkJobNode {
final String workflowId = args.get("workflowId");
log.info("workflowId is {}", workflowId);
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
try (AggregatorReport report = new AggregatorReport(messageSender)) {
try {
final Dataset<MetadataRecord> mdstore = spark
MapFunction<MetadataRecord, MetadataRecord> x = TransformationFactory
.getTransformationPlugin(args, ct, isLookUpService);
final Dataset<MetadataRecord> inputMDStore = spark
.read()
.format("parquet")
.load(inputPath)
.as(encoder)
.map(
TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
encoder);
saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH);
.as(encoder);
final long totalInput = inputMDStore.count();
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
try (AggregatorReport report = new AggregatorReport(messageSender)) {
try {
JavaRDD<MetadataRecord> mdstore = inputMDStore
.javaRDD()
.repartition(getRepartitionNumber(totalInput, rpt))
.map((Function<MetadataRecord, MetadataRecord>) x::call);
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());

View File

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.transformation.xslt;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
@ -15,7 +16,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import net.sf.saxon.s9api.*;
public class XSLTTransformationFunction implements MapFunction<MetadataRecord, MetadataRecord> {
public class XSLTTransformationFunction implements MapFunction<MetadataRecord, MetadataRecord>, Serializable {
public final static String QNAME_BASE_URI = "http://eu/dnetlib/transform";
@ -27,6 +28,8 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
private final long dateOfTransformation;
private final VocabularyGroup vocabularies;
public XSLTTransformationFunction(
final AggregationCounter aggregationCounter,
final String transformationRule,
@ -35,6 +38,7 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
throws Exception {
this.aggregationCounter = aggregationCounter;
this.transformationRule = transformationRule;
this.vocabularies = vocabularies;
this.dateOfTransformation = dateOfTransformation;
cleanFunction = new Cleaner(vocabularies);
}
@ -73,4 +77,24 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
throw new RuntimeException(e);
}
}
public AggregationCounter getAggregationCounter() {
return aggregationCounter;
}
public String getTransformationRule() {
return transformationRule;
}
public Cleaner getCleanFunction() {
return cleanFunction;
}
public long getDateOfTransformation() {
return dateOfTransformation;
}
public VocabularyGroup getVocabularies() {
return vocabularies;
}
}

View File

@ -37,6 +37,12 @@
<name>dnetMessageManagerURL</name>
<description>The URI of the Dnet Message Manager</description>
</property>
<property>
<name>recordsPerTask</name>
<value>200</value>
<description>The URI of the Dnet Message Manager</description>
</property>
</parameters>
<start to="BeginRead"/>
@ -103,6 +109,7 @@
<arg>--transformationPlugin</arg><arg>${transformationPlugin}</arg>
<arg>--transformationRuleId</arg><arg>${transformationRuleId}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--recordsPerTask</arg><arg>${recordsPerTask}</arg>
<arg>--workflowId</arg><arg>${workflowId}</arg>
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
</spark>

View File

@ -48,6 +48,12 @@
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "rpt",
"paramLongName": "recordsPerTask",
"paramDescription": "the number of records transformed by a single Task",
"paramRequired": false
},
{
"paramName": "tp",
"paramLongName": "transformationPlugin",

View File

@ -180,7 +180,7 @@ public class GenerateNativeStoreSparkJobTest extends AbstractVocabularyTest {
TransformSparkJobNode
.transformRecords(
parameters, isLookUpService, spark, mdStoreV2.getHdfsPath() + MDSTORE_DATA_PATH,
mdStoreCleanedVersion.getHdfsPath());
mdStoreCleanedVersion.getHdfsPath(), 200);
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mOutput = spark

View File

@ -167,7 +167,8 @@ public class TransformationJobTest extends AbstractVocabularyTest {
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output);
TransformSparkJobNode
.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output, 200);
// TODO introduce useful assertions
@ -221,7 +222,8 @@ public class TransformationJobTest extends AbstractVocabularyTest {
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output);
TransformSparkJobNode
.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output, 200);
// TODO introduce useful assertions

View File

@ -1,16 +1,13 @@
<xsl:stylesheet
version="2.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaf="http://namespace.openaire.eu/oaf"
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:vocabulary="http://eu/dnetlib/transform/clean"
xmlns:dateCleaner="http://eu/dnetlib/transform/dateISO"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
exclude-result-prefixes="xsl vocabulary dateCleaner">
exclude-result-prefixes="xsl vocabulary dateCleaner" version="2.0">
<xsl:param name="varOfficialName"/>
<xsl:param name="varDsType"/>
<xsl:param name="varDataSourceId"/>
<xsl:param name="varFP7" select="'corda_______::'"/>
<xsl:param name="varH2020" select="'corda__h2020::'"/>
<xsl:param name="varAKA" select="'aka_________::'"/>
@ -34,29 +31,22 @@
<xsl:param name="varTARA" select="'taraexp_____::'"/><!-- TARA awaiting DOI from André -->
<xsl:param name="varTUBITAK" select="'tubitakf____::'"/>
<xsl:param name="varWT" select="'wt__________::'"/>
<xsl:param name="index" select="0"/>
<xsl:param name="transDate" select="current-dateTime()"/>
<xsl:template match="/">
<xsl:variable name="datasourcePrefix" select="normalize-space(//oaf:datasourceprefix)"/>
<xsl:call-template name="validRecord"/>
</xsl:template>
<xsl:template name="validRecord">
<record>
<xsl:apply-templates select="//*[local-name() = 'header']"/>
<metadata>
<xsl:apply-templates select="//*[local-name() = 'metadata']//*[local-name() = 'resource']"/>
<xsl:if test="//*[local-name()='date']/@dateType='Available'">
<xsl:variable name='varEmbargoEndDate'
<xsl:variable name="varEmbargoEndDate"
select="dateCleaner:dateISO(normalize-space(//*[local-name()='date'][@dateType='Available']))"/>
<xsl:choose>
<xsl:when test="string-length($varEmbargoEndDate) > 0">
<xsl:when test="string-length($varEmbargoEndDate) &gt; 0">
<oaf:embargoenddate>
<xsl:value-of select="$varEmbargoEndDate"/>
</oaf:embargoenddate>
@ -68,55 +58,44 @@
</xsl:otherwise>
</xsl:choose>
</xsl:if>
<dr:CobjCategory>
<xsl:variable name="varCobjCategory"
select="vocabulary:clean( //*[local-name()='resourceType']/@resourceTypeGeneral, 'dnet:publication_resource')"/>
<xsl:variable name="varSuperType"
select="vocabulary:clean( $varCobjCategory, 'dnet:result_typologies')"/>
<xsl:attribute name="type">
<xsl:value-of select="$varSuperType"/>
</xsl:attribute>
<xsl:value-of select="$varCobjCategory"/>
</dr:CobjCategory>
<!-- review status -->
<!-- Zenodo -->
<xsl:variable name="varRefereedConvt" select="for $i in (//*[local-name()='resourceType']/(., @resourceTypeGeneral), //*[local-name()='version'])
return vocabulary:clean(normalize-space($i), 'dnet:review_levels')"/>
</dr:CobjCategory><!-- review status --><!-- Zenodo -->
<xsl:variable name="varRefereedConvt"
select="for $i in (//*[local-name()='resourceType']/(., @resourceTypeGeneral), //*[local-name()='version']) return vocabulary:clean(normalize-space($i), 'dnet:review_levels')"/>
<xsl:variable name="varRefereedIdntf"
select="//*[local-name()=('identifier', 'alternateIdentifier')][matches(lower-case(.), '.*([\s\-\.\\_/:]|%[2-7][0-9A-F])pre([\s\-\.\\_/:]|%[2-7][0-9A-F])?prints?([\s\-\.\\_/:%].*|$)')]/'0002' "/>
<xsl:variable name="varRefereedReltn"
select="//*[local-name()='relatedIdentifier'][./@relationType/lower-case(.)='isreviewedby']/'0001' "/>
<xsl:variable name="varRefereedVersn" select="(//*[local-name()='version'][matches(lower-case(.), '.*peer[\s\-\.\\_/:%]?reviewed.*')]/'0001',
//*[local-name()='version'][matches(normalize-space(lower-case(.)), '^(v|vs|version|rel|release)?[\s\.\-_]*0$')]/'0002',
//*[local-name()='version'][matches(lower-case(.), '(^|[\s\-\.\\_/:%].*)(beta|draft|trial|test)([\s\-\.\\_/:%].*|$)')]/'0002',
//*[local-name()='version'][matches(lower-case(.), '.*submi(tted|ssion|ttal).*')]/'0002') "/>
<xsl:variable name="varRefereedOther" select="(//*[local-name()='publisher'][matches(lower-case(.), '.*[\s\-\.\\_/:%]pre[\s\-\.\\_/:%]?prints?([\s\-\.\\_/:%].*|$)')]/'0002',
//*[local-name()='description'][matches(lower-case(.), '^peer[\s\-\.\\_/:%]?reviewed$')]/'0001',
//*[local-name()='description'][matches(lower-case(.), '^pre[\s\-\.\\_/:%]?prints?$')]/'0002') "/>
<xsl:variable name="varRefereedVersn"
select="(//*[local-name()='version'][matches(lower-case(.), '.*peer[\s\-\.\\_/:%]?reviewed.*')]/'0001', //*[local-name()='version'][matches(normalize-space(lower-case(.)), '^(v|vs|version|rel|release)?[\s\.\-_]*0$')]/'0002', //*[local-name()='version'][matches(lower-case(.), '(^|[\s\-\.\\_/:%].*)(beta|draft|trial|test)([\s\-\.\\_/:%].*|$)')]/'0002', //*[local-name()='version'][matches(lower-case(.), '.*submi(tted|ssion|ttal).*')]/'0002') "/>
<xsl:variable name="varRefereedOther"
select="(//*[local-name()='publisher'][matches(lower-case(.), '.*[\s\-\.\\_/:%]pre[\s\-\.\\_/:%]?prints?([\s\-\.\\_/:%].*|$)')]/'0002', //*[local-name()='description'][matches(lower-case(.), '^peer[\s\-\.\\_/:%]?reviewed$')]/'0001', //*[local-name()='description'][matches(lower-case(.), '^pre[\s\-\.\\_/:%]?prints?$')]/'0002') "/>
<xsl:variable name="varRefereed"
select="($varRefereedConvt, $varRefereedIdntf, $varRefereedReltn, $varRefereedVersn, $varRefereedOther)"/>
<xsl:choose>
<xsl:when test="count($varRefereed[. = '0001']) > 0">
<xsl:when test="count($varRefereed[. = '0001']) &gt; 0">
<oaf:refereed>
<xsl:value-of select="'0001'"/>
</oaf:refereed>
</xsl:when>
<xsl:when test="count($varRefereed[. = '0002']) > 0">
<xsl:when test="count($varRefereed[. = '0002']) &gt; 0">
<oaf:refereed>
<xsl:value-of select="'0002'"/>
</oaf:refereed>
</xsl:when>
</xsl:choose>
<oaf:dateAccepted>
<xsl:value-of
select="dateCleaner:dateISO(normalize-space(//*[local-name()='publicationYear']))"/>
<xsl:value-of select="dateCleaner:dateISO(normalize-space(//*[local-name()='publicationYear']))"/>
</oaf:dateAccepted>
<xsl:choose>
<xsl:when
test="//*[local-name() = 'rights'][starts-with(normalize-space(.), 'info:eu-repo/semantics')]">
<oaf:accessrights>
@ -147,17 +126,12 @@
</xsl:choose>
</xsl:otherwise>
</xsl:choose>
<oaf:language>
<xsl:value-of
select="vocabulary:clean(//*[local-name()='language'], 'dnet:languages')"/>
<xsl:value-of select="vocabulary:clean(//*[local-name()='language'], 'dnet:languages')"/>
</oaf:language>
<xsl:for-each
select="//*[local-name()='nameIdentifier'][contains(., 'info:eu-repo/grantAgreement/')], //*[local-name()='fundingReference']/*[local-name()='awardNumber']">
<xsl:choose>
<xsl:when
test="matches(normalize-space(.), '(info:eu-repo/grantagreement/ec/fp7/)(\d\d\d\d\d\d)(.*)', 'i') or ../*[local-name() = 'funderIdentifier' and . = '10.13039/100011102']">
<oaf:projectid>
@ -308,13 +282,10 @@
select="concat($varWT, replace(normalize-space(.), '(info:eu-repo/grantagreement/wt/.*?/)([^/]*)(/.*)?', '$2', 'i'))"/>
</oaf:projectid>
</xsl:when>
</xsl:choose>
</xsl:for-each>
<xsl:for-each select="//*[local-name()='relatedIdentifier']">
<xsl:if
test="starts-with(./text(), 'https://zenodo.org/communities/')">
<xsl:if test="starts-with(./text(), 'https://zenodo.org/communities/')">
<oaf:concept>
<xsl:attribute name="id">
<xsl:value-of select="./text()"/>
@ -322,7 +293,6 @@
</oaf:concept>
</xsl:if>
</xsl:for-each>
<oaf:hostedBy>
<xsl:attribute name="name">
<xsl:value-of select="$varOfficialName"/>
@ -343,24 +313,19 @@
<xsl:copy-of select="//*[local-name() = 'about']"/>
</record>
</xsl:template>
<xsl:template match="node()|@*">
<xsl:copy>
<xsl:apply-templates select="node()|@*"/>
</xsl:copy>
</xsl:template>
<xsl:template match="//*[local-name() = 'metadata']//*[local-name() = 'resource']">
<xsl:copy>
<xsl:apply-templates select="node()|@*"/>
</xsl:copy>
</xsl:template>
<xsl:template match="//*[local-name() = 'resource']/*[local-name()='alternateIdentifiers']">
<xsl:element name="alternateIdentifiers" namespace="http://www.openarchives.org/OAI/2.0/">
<xsl:copy-of select="./*"/>
<xsl:if test="//*[local-name() = 'resource']/*[local-name()='identifier'][@identifierType='Handle']">
<xsl:element name="alternateIdentifier" namespace="http://www.openarchives.org/OAI/2.0/">
<xsl:attribute name="alternateIdentifierType">
@ -370,7 +335,6 @@
select="concat('http://hdl.handle.net/', //*[local-name() = 'resource']/*[local-name()='identifier'])"/>
</xsl:element>
</xsl:if>
<xsl:if test="//*[local-name() = 'resource']/*[local-name()='identifier'][@identifierType='URN']">
<xsl:element name="alternateIdentifier" namespace="http://www.openarchives.org/OAI/2.0/">
<xsl:attribute name="alternateIdentifierType">
@ -380,7 +344,6 @@
select="concat('http://nbn-resolving.org/', //*[local-name() = 'resource']/*[local-name()='identifier'])"/>
</xsl:element>
</xsl:if>
<xsl:if test="//*[local-name() = 'resource']/*[local-name()='identifier'][@identifierType='DOI']">
<xsl:element name="alternateIdentifier" namespace="http://www.openarchives.org/OAI/2.0/">
<xsl:attribute name="alternateIdentifierType">
@ -390,11 +353,8 @@
select="concat('http://dx.doi.org/', //*[local-name() = 'resource']/*[local-name()='identifier'])"/>
</xsl:element>
</xsl:if>
</xsl:element>
</xsl:template>
<xsl:template match="//*[local-name() = 'resource']/*[local-name()='identifier']">
<xsl:copy-of select="."/>
<xsl:if test="not(//*[local-name() = 'resource']/*[local-name()='alternateIdentifiers'])">
@ -404,8 +364,7 @@
<xsl:attribute name="alternateIdentifierType">
<xsl:value-of select="'URL'"/>
</xsl:attribute>
<xsl:value-of
select="concat('http://hdl.handle.net/', .)"/>
<xsl:value-of select="concat('http://hdl.handle.net/', .)"/>
</xsl:element>
</xsl:if>
<xsl:if test=".[@identifierType='URN']">
@ -413,8 +372,7 @@
<xsl:attribute name="alternateIdentifierType">
<xsl:value-of select="'URL'"/>
</xsl:attribute>
<xsl:value-of
select="concat('http://nbn-resolving.org/', .)"/>
<xsl:value-of select="concat('http://nbn-resolving.org/', .)"/>
</xsl:element>
</xsl:if>
<xsl:if test=".[@identifierType='DOI']">
@ -422,17 +380,12 @@
<xsl:attribute name="alternateIdentifierType">
<xsl:value-of select="'URL'"/>
</xsl:attribute>
<xsl:value-of
select="concat('http://dx.doi.org/', .)"/>
<xsl:value-of select="concat('http://dx.doi.org/', .)"/>
</xsl:element>
</xsl:if>
</xsl:element>
</xsl:if>
</xsl:template>
<xsl:template match="//*[local-name() = 'header']">
<xsl:copy>
<xsl:apply-templates select="node()|@*"/>
@ -441,5 +394,4 @@
</xsl:element>
</xsl:copy>
</xsl:template>
</xsl:stylesheet>