1
0
Fork 0

removed unuseful accumulator

This commit is contained in:
Enrico Ottonello 2022-07-31 01:03:29 +02:00
parent 657b0208a2
commit 64311b8be4
3 changed files with 22 additions and 21 deletions

View File

@ -80,17 +80,10 @@ public class SparkDownloadOrcidWorks {
LongAccumulator parsedWorksAcc = spark.sparkContext().longAccumulator("parsed_works"); LongAccumulator parsedWorksAcc = spark.sparkContext().longAccumulator("parsed_works");
LongAccumulator modifiedWorksAcc = spark.sparkContext().longAccumulator("modified_works"); LongAccumulator modifiedWorksAcc = spark.sparkContext().longAccumulator("modified_works");
LongAccumulator errorCodeFoundAcc = spark.sparkContext().longAccumulator("error_code_found"); LongAccumulator errorCodeFoundAcc = spark.sparkContext().longAccumulator("error_code_found");
LongAccumulator errorLoadingJsonFoundAcc = spark
.sparkContext()
.longAccumulator("error_loading_json_found");
LongAccumulator errorLoadingXMLFoundAcc = spark
.sparkContext()
.longAccumulator("error_loading_xml_found");
LongAccumulator errorParsingXMLFoundAcc = spark LongAccumulator errorParsingXMLFoundAcc = spark
.sparkContext() .sparkContext()
.longAccumulator("error_parsing_xml_found"); .longAccumulator("error_parsing_xml_found");
LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records"); LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records");
LongAccumulator errorsAcc = spark.sparkContext().longAccumulator("errors");
JavaPairRDD<Text, Text> updatedAuthorsRDD = sc JavaPairRDD<Text, Text> updatedAuthorsRDD = sc
.sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class); .sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class);
@ -107,11 +100,10 @@ public class SparkDownloadOrcidWorks {
if (statusCode.equals("200")) { if (statusCode.equals("200")) {
String compressedData = getJsonValue(jElement, "compressedData"); String compressedData = getJsonValue(jElement, "compressedData");
if (StringUtils.isEmpty(compressedData)) { if (StringUtils.isEmpty(compressedData)) {
errorLoadingJsonFoundAcc.add(1);
} else { } else {
String authorSummary = ArgumentApplicationParser.decompressValue(compressedData); String authorSummary = ArgumentApplicationParser.decompressValue(compressedData);
if (StringUtils.isEmpty(authorSummary)) { if (StringUtils.isEmpty(authorSummary)) {
errorLoadingXMLFoundAcc.add(1);
} else { } else {
try { try {
workIdLastModifiedDate = XMLRecordParser workIdLastModifiedDate = XMLRecordParser
@ -184,7 +176,6 @@ public class SparkDownloadOrcidWorks {
} else { } else {
downloaded.setStatusCode(-4); downloaded.setStatusCode(-4);
} }
errorsAcc.add(1);
} }
long endReq = System.currentTimeMillis(); long endReq = System.currentTimeMillis();
long reqTime = endReq - startReq; long reqTime = endReq - startReq;
@ -193,7 +184,6 @@ public class SparkDownloadOrcidWorks {
} }
if (downloadCompleted) { if (downloadCompleted) {
downloaded.setStatusCode(200); downloaded.setStatusCode(200);
downloadedRecordsAcc.add(1);
downloaded downloaded
.setCompressedData( .setCompressedData(
ArgumentApplicationParser ArgumentApplicationParser
@ -214,9 +204,20 @@ public class SparkDownloadOrcidWorks {
String works = ArgumentApplicationParser.decompressValue(compressedData); String works = ArgumentApplicationParser.decompressValue(compressedData);
// split a single xml containing multiple works into multiple xml (a single work for each xml) // split a single xml containing multiple works into multiple xml (a single work for each xml)
List<String> splittedWorks = XMLRecordParser List<String> splittedWorks = null;
.splitWorks(orcidId, works.getBytes(StandardCharsets.UTF_8)); try {
splittedWorks = XMLRecordParser
.splitWorks(orcidId, works.getBytes(StandardCharsets.UTF_8));
} catch (Throwable t) {
final DownloadedRecordData errDownloaded = new DownloadedRecordData();
errDownloaded.setOrcidId(orcidId);
errDownloaded.setLastModifiedDate(lastModifiedDate);
errDownloaded.setStatusCode(-10);
errDownloaded.setErrorMessage(t.getMessage());
splittedDownloadedWorks.add(errDownloaded.toTuple2());
errorParsingXMLFoundAcc.add(1);
return splittedDownloadedWorks.iterator();
}
splittedWorks.forEach(w -> { splittedWorks.forEach(w -> {
final DownloadedRecordData downloaded = new DownloadedRecordData(); final DownloadedRecordData downloaded = new DownloadedRecordData();
downloaded.setOrcidId(orcidId); downloaded.setOrcidId(orcidId);
@ -228,10 +229,12 @@ public class SparkDownloadOrcidWorks {
.setCompressedData( .setCompressedData(
ArgumentApplicationParser ArgumentApplicationParser
.compressArgument(w)); .compressArgument(w));
} catch (IOException e) { } catch (Throwable t) {
throw new RuntimeException(e); downloaded.setStatusCode(-11);
downloaded.setErrorMessage(t.getMessage());
} }
splittedDownloadedWorks.add(downloaded.toTuple2()); splittedDownloadedWorks.add(downloaded.toTuple2());
downloadedRecordsAcc.add(1);
}); });
return splittedDownloadedWorks.iterator(); return splittedDownloadedWorks.iterator();
@ -250,11 +253,8 @@ public class SparkDownloadOrcidWorks {
logger.info("parsedWorksAcc: {}", parsedWorksAcc.value()); logger.info("parsedWorksAcc: {}", parsedWorksAcc.value());
logger.info("modifiedWorksAcc: {}", modifiedWorksAcc.value()); logger.info("modifiedWorksAcc: {}", modifiedWorksAcc.value());
logger.info("errorCodeFoundAcc: {}", errorCodeFoundAcc.value()); logger.info("errorCodeFoundAcc: {}", errorCodeFoundAcc.value());
logger.info("errorLoadingJsonFoundAcc: {}", errorLoadingJsonFoundAcc.value());
logger.info("errorLoadingXMLFoundAcc: {}", errorLoadingXMLFoundAcc.value());
logger.info("errorParsingXMLFoundAcc: {}", errorParsingXMLFoundAcc.value()); logger.info("errorParsingXMLFoundAcc: {}", errorParsingXMLFoundAcc.value());
logger.info("downloadedRecordsAcc: {}", downloadedRecordsAcc.value()); logger.info("downloadedRecordsAcc: {}", downloadedRecordsAcc.value());
logger.info("errorsAcc: {}", errorsAcc.value());
}); });
} }

View File

@ -78,7 +78,7 @@
</configuration> </configuration>
</global> </global>
<start to="DownloadOrcidWorks"/> <start to="ResetLambda"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -190,7 +190,7 @@
<arg>-o</arg><arg>downloads/updated_works</arg> <arg>-o</arg><arg>downloads/updated_works</arg>
<arg>-t</arg><arg>${token}</arg> <arg>-t</arg><arg>${token}</arg>
</spark> </spark>
<ok to="End"/> <ok to="ResetNewAuthors"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -7,5 +7,6 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout. # A1 uses PatternLayout.
log4j.logger.org = ERROR log4j.logger.org = ERROR
log4j.logger.eu.dnetlib = DEBUG log4j.logger.eu.dnetlib = DEBUG
log4j.logger.eu.dnetlib.doiboost.orcid = INFO
log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n