Implemented new Transformation using spark
parent
a54848a59c
commit
184e7b3856
@ -0,0 +1,45 @@
|
||||
package eu.dnetlib.dhp.aggregation.common;
|
||||
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
|
||||
public class AggregationCounter implements Serializable {
|
||||
private LongAccumulator totalItems;
|
||||
private LongAccumulator errorItems;
|
||||
private LongAccumulator processedItems;
|
||||
|
||||
public AggregationCounter() {
|
||||
}
|
||||
|
||||
public AggregationCounter(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator processedItems) {
|
||||
this.totalItems = totalItems;
|
||||
this.errorItems = errorItems;
|
||||
this.processedItems = processedItems;
|
||||
}
|
||||
|
||||
public LongAccumulator getTotalItems() {
|
||||
return totalItems;
|
||||
}
|
||||
|
||||
public void setTotalItems(LongAccumulator totalItems) {
|
||||
this.totalItems = totalItems;
|
||||
}
|
||||
|
||||
public LongAccumulator getErrorItems() {
|
||||
return errorItems;
|
||||
}
|
||||
|
||||
public void setErrorItems(LongAccumulator errorItems) {
|
||||
this.errorItems = errorItems;
|
||||
}
|
||||
|
||||
public LongAccumulator getProcessedItems() {
|
||||
return processedItems;
|
||||
}
|
||||
|
||||
public void setProcessedItems(LongAccumulator processedItems) {
|
||||
this.processedItems = processedItems;
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package eu.dnetlib.dhp.transformation;
|
||||
|
||||
public class DnetTransformationException extends Exception {
|
||||
|
||||
public DnetTransformationException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public DnetTransformationException(
|
||||
final String message,
|
||||
final Throwable cause,
|
||||
final boolean enableSuppression,
|
||||
final boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public DnetTransformationException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public DnetTransformationException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public DnetTransformationException(final Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
@ -1,74 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.transformation;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.transformation.functions.Cleaner;
|
||||
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
|
||||
import net.sf.saxon.s9api.*;
|
||||
|
||||
public class TransformFunction implements MapFunction<MetadataRecord, MetadataRecord> {
|
||||
|
||||
private final LongAccumulator totalItems;
|
||||
private final LongAccumulator errorItems;
|
||||
private final LongAccumulator transformedItems;
|
||||
private final String transformationRule;
|
||||
private final Cleaner cleanFunction;
|
||||
|
||||
private final long dateOfTransformation;
|
||||
|
||||
public TransformFunction(
|
||||
LongAccumulator totalItems,
|
||||
LongAccumulator errorItems,
|
||||
LongAccumulator transformedItems,
|
||||
final String transformationRule,
|
||||
long dateOfTransformation,
|
||||
final Map<String, Vocabulary> vocabularies)
|
||||
throws Exception {
|
||||
this.totalItems = totalItems;
|
||||
this.errorItems = errorItems;
|
||||
this.transformedItems = transformedItems;
|
||||
this.transformationRule = transformationRule;
|
||||
this.dateOfTransformation = dateOfTransformation;
|
||||
cleanFunction = new Cleaner(vocabularies);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetadataRecord call(MetadataRecord value) {
|
||||
totalItems.add(1);
|
||||
try {
|
||||
Processor processor = new Processor(false);
|
||||
processor.registerExtensionFunction(cleanFunction);
|
||||
final XsltCompiler comp = processor.newXsltCompiler();
|
||||
XsltExecutable xslt = comp
|
||||
.compile(new StreamSource(new ByteArrayInputStream(transformationRule.getBytes())));
|
||||
XdmNode source = processor
|
||||
.newDocumentBuilder()
|
||||
.build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())));
|
||||
XsltTransformer trans = xslt.load();
|
||||
trans.setInitialContextNode(source);
|
||||
final StringWriter output = new StringWriter();
|
||||
Serializer out = processor.newSerializer(output);
|
||||
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
||||
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
||||
trans.setDestination(out);
|
||||
trans.transform();
|
||||
final String xml = output.toString();
|
||||
value.setBody(xml);
|
||||
value.setDateOfTransformation(dateOfTransformation);
|
||||
transformedItems.add(1);
|
||||
return value;
|
||||
} catch (Throwable e) {
|
||||
errorItems.add(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package eu.dnetlib.dhp.transformation;
|
||||
|
||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TransformationFactory {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class);
|
||||
public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()";
|
||||
|
||||
|
||||
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(final Map<String,String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) throws DnetTransformationException {
|
||||
|
||||
try {
|
||||
final String transformationPlugin = jobArgument.get("transformationPlugin");
|
||||
|
||||
log.info("Transformation plugin required "+transformationPlugin);
|
||||
switch (transformationPlugin) {
|
||||
case "XSLT_TRANSFORM": {
|
||||
final String transformationRuleName = jobArgument.get("transformationRule");
|
||||
if (StringUtils.isBlank(transformationRuleName))
|
||||
throw new DnetTransformationException("Missing Parameter transformationRule");
|
||||
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||
|
||||
final String transformationRule = queryTransformationRuleFromIS(transformationRuleName, isLookupService);
|
||||
|
||||
final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation"));
|
||||
return new XSLTTransformationFunction(counters,transformationRule,dateOfTransformation,vocabularies);
|
||||
|
||||
}
|
||||
default:
|
||||
throw new DnetTransformationException("transformation plugin does not exists for " + transformationPlugin);
|
||||
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
throw new DnetTransformationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String queryTransformationRuleFromIS(final String transformationRuleName, final ISLookUpService isLookUpService) throws Exception {
|
||||
final String query = String.format(TRULE_XQUERY, transformationRuleName);
|
||||
log.info("asking query to IS: "+ query);
|
||||
List<String> result = isLookUpService.quickSearchProfile(query);
|
||||
|
||||
if (result==null || result.isEmpty())
|
||||
throw new DnetTransformationException("Unable to find transformation rule with name: "+ transformationRuleName);
|
||||
return result.get(0);
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
|
||||
package eu.dnetlib.dhp.transformation.xslt;
|
||||
|
||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import net.sf.saxon.s9api.*;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class XSLTTransformationFunction implements MapFunction<MetadataRecord, MetadataRecord> {
|
||||
|
||||
private final AggregationCounter aggregationCounter;
|
||||
|
||||
private final String transformationRule;
|
||||
|
||||
private final Cleaner cleanFunction;
|
||||
|
||||
private final long dateOfTransformation;
|
||||
|
||||
public XSLTTransformationFunction(
|
||||
final AggregationCounter aggregationCounter,
|
||||
final String transformationRule,
|
||||
long dateOfTransformation,
|
||||
final VocabularyGroup vocabularies)
|
||||
throws Exception {
|
||||
this.aggregationCounter = aggregationCounter;
|
||||
this.transformationRule = transformationRule;
|
||||
this.dateOfTransformation = dateOfTransformation;
|
||||
cleanFunction = new Cleaner(vocabularies);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetadataRecord call(MetadataRecord value) {
|
||||
aggregationCounter.getTotalItems().add(1);
|
||||
try {
|
||||
Processor processor = new Processor(false);
|
||||
processor.registerExtensionFunction(cleanFunction);
|
||||
final XsltCompiler comp = processor.newXsltCompiler();
|
||||
XsltExecutable xslt = comp
|
||||
.compile(new StreamSource(new ByteArrayInputStream(transformationRule.getBytes())));
|
||||
XdmNode source = processor
|
||||
.newDocumentBuilder()
|
||||
.build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())));
|
||||
XsltTransformer trans = xslt.load();
|
||||
trans.setInitialContextNode(source);
|
||||
final StringWriter output = new StringWriter();
|
||||
Serializer out = processor.newSerializer(output);
|
||||
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
||||
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
||||
trans.setDestination(out);
|
||||
trans.transform();
|
||||
final String xml = output.toString();
|
||||
value.setBody(xml);
|
||||
value.setDateOfTransformation(dateOfTransformation);
|
||||
aggregationCounter.getProcessedItems().add(1);
|
||||
return value;
|
||||
} catch (Throwable e) {
|
||||
aggregationCounter.getErrorItems().add(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue