master #59

Closed
claudio.atzori wants to merge 3221 commits from master into stable_ids
1 changed files with 9 additions and 5 deletions
Showing only changes of commit d147295c2f - Show all commits

View File

@ -31,6 +31,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
public class CopyHdfsOafApplication extends AbstractMigrationApplication { public class CopyHdfsOafApplication extends AbstractMigrationApplication {
@ -73,10 +74,13 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, hdfsPath, paths)); final List<String> oafTypes = Lists.newArrayList(ModelSupport.oafTypes.keySet());
runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, oafTypes, hdfsPath, paths));
} }
public static void processPaths(final SparkSession spark, public static void processPaths(final SparkSession spark,
final List<String> oafTypes,
final String outputPath, final String outputPath,
final Set<String> paths) { final Set<String> paths) {
@ -99,14 +103,14 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication {
.as(Encoders.kryo(Oaf.class)); .as(Encoders.kryo(Oaf.class));
// dispatch each entity type individually in the respective graph subdirectory in append mode // dispatch each entity type individually in the respective graph subdirectory in append mode
for (Map.Entry<String, Class> e : ModelSupport.oafTypes.entrySet()) { for (String type : oafTypes) {
oaf oaf
.filter((FilterFunction<Oaf>) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey())) .filter((FilterFunction<Oaf>) o -> o.getClass().getSimpleName().toLowerCase().equals(type))
.map((MapFunction<Oaf, String>) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) .map((MapFunction<Oaf, String>) OBJECT_MAPPER::writeValueAsString, Encoders.STRING())
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Append) .mode(SaveMode.Append)
.text(outputPath + "/" + e.getKey()); .text(outputPath + "/" + type);
} }
} }
} }