forked from D-Net/dnet-hadoop
[graph provision] adds the possibility to validate the XML records before storing them via the validateXML parameter
This commit is contained in:
parent
39a2afe8b5
commit
55f39f7850
|
@ -64,6 +64,12 @@ public class PayloadConverterJob {
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final Boolean validateXML = Optional
|
||||||
|
.ofNullable(parser.get("validateXML"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.FALSE);
|
||||||
|
log.info("validateXML: {}", validateXML);
|
||||||
|
|
||||||
final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
|
final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
|
||||||
log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
|
log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
|
||||||
|
|
||||||
|
@ -78,18 +84,19 @@ public class PayloadConverterJob {
|
||||||
|
|
||||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
convertToXml(
|
createPayloads(
|
||||||
spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl),
|
spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl),
|
||||||
VocabularyGroup.loadVocsFromIS(isLookup));
|
VocabularyGroup.loadVocsFromIS(isLookup), validateXML);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void convertToXml(
|
private static void createPayloads(
|
||||||
final SparkSession spark,
|
final SparkSession spark,
|
||||||
final String inputPath,
|
final String inputPath,
|
||||||
final String outputPath,
|
final String outputPath,
|
||||||
final ContextMapper contextMapper,
|
final ContextMapper contextMapper,
|
||||||
final VocabularyGroup vocabularies) {
|
final VocabularyGroup vocabularies,
|
||||||
|
final Boolean validateXML) {
|
||||||
|
|
||||||
final XmlRecordFactory recordFactory = new XmlRecordFactory(
|
final XmlRecordFactory recordFactory = new XmlRecordFactory(
|
||||||
prepareAccumulators(spark.sparkContext()),
|
prepareAccumulators(spark.sparkContext()),
|
||||||
|
@ -110,7 +117,7 @@ public class PayloadConverterJob {
|
||||||
.as(Encoders.kryo(JoinedEntity.class))
|
.as(Encoders.kryo(JoinedEntity.class))
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
|
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
|
||||||
recordFactory.build(je),
|
recordFactory.build(je, validateXML),
|
||||||
ProvisionModelSupport.transform(je, contextMapper, vocabularies)),
|
ProvisionModelSupport.transform(je, contextMapper, vocabularies)),
|
||||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class)))
|
Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class)))
|
||||||
.map(
|
.map(
|
||||||
|
|
|
@ -22,5 +22,11 @@
|
||||||
"paramLongName": "isLookupUrl",
|
"paramLongName": "isLookupUrl",
|
||||||
"paramDescription": "URL of the context ISLookup Service",
|
"paramDescription": "URL of the context ISLookup Service",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "val",
|
||||||
|
"paramLongName": "validateXML",
|
||||||
|
"paramDescription": "should the process check the XML validity",
|
||||||
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -13,6 +13,11 @@
|
||||||
<name>contextApiBaseUrl</name>
|
<name>contextApiBaseUrl</name>
|
||||||
<description>context API URL</description>
|
<description>context API URL</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>validateXML</name>
|
||||||
|
<description>should the payload converter validate the XMLs</description>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>relPartitions</name>
|
<name>relPartitions</name>
|
||||||
<description>number or partitions for the relations Dataset</description>
|
<description>number or partitions for the relations Dataset</description>
|
||||||
|
@ -610,6 +615,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>/user/claudio.atzori/data/beta_provision/join_entities</arg>
|
<arg>--inputPath</arg><arg>/user/claudio.atzori/data/beta_provision/join_entities</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/xml_json</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/xml_json</arg>
|
||||||
|
<arg>--validateXML</arg><arg>${validateXML}</arg>
|
||||||
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
|
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
|
Loading…
Reference in New Issue