diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 90a99926e..e62b4b4fc 100644 --- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -25,6 +25,7 @@ import eu.dnetlib.dhp.schema.oaf.*; public class SparkBulkTagJob { private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class); + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -108,12 +109,12 @@ public class SparkBulkTagJob { .json(outputPath); } - private static Dataset readPath( - SparkSession spark, String inputEntityPath, Class clazz) { + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { return spark .read() - .json(inputEntityPath) - .as(Encoders.bean(clazz)); + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } }