1
0
Fork 0

minor changes

This commit is contained in:
Claudio Atzori 2021-02-02 12:44:04 +01:00
parent bb89b99b24
commit ca4391aa1c
1 changed files with 12 additions and 11 deletions

View File

@ -2,8 +2,7 @@
package eu.dnetlib.dhp.transformation;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.saveDataset;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.writeTotalSizeOnHDFS;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
@ -19,8 +18,6 @@ import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -52,11 +49,14 @@ public class TransformSparkJobNode {
final String mdstoreInputVersion = parser.get("mdstoreInputVersion");
final String mdstoreOutputVersion = parser.get("mdstoreOutputVersion");
// TODO this variable will be used after implementing Messaging with DNet Aggregator
final ObjectMapper jsonMapper = new ObjectMapper();
final MDStoreVersion nativeMdStoreVersion = jsonMapper.readValue(mdstoreInputVersion, MDStoreVersion.class);
final MDStoreVersion cleanedMdStoreVersion = jsonMapper.readValue(mdstoreOutputVersion, MDStoreVersion.class);
final MDStoreVersion nativeMdStoreVersion = MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class);
final String inputPath = nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH;
log.info("input path: {}", inputPath);
final MDStoreVersion cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, MDStoreVersion.class);
final String outputPath = cleanedMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH;
log.info("output path: {}", outputPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info(String.format("isLookupUrl: %s", isLookupUrl));
@ -74,9 +74,10 @@ public class TransformSparkJobNode {
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> transformRecords(
parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH,
cleanedMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH));
spark -> {
transformRecords(
parser.getObjectMap(), isLookupService, spark, inputPath, outputPath);
});
}
public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService,