using newline delimited json files for the raw graph materialization. Introduced contentPath parameter

This commit is contained in:
Claudio Atzori 2020-04-15 16:16:20 +02:00
parent 3d3ac76dda
commit ff30f99c65
3 changed files with 30 additions and 18 deletions

View File

@ -64,6 +64,7 @@ public class DispatchEntitiesApplication {
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath)); log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
/*
spark.read() spark.read()
.textFile(sourcePath) .textFile(sourcePath)
.filter((FilterFunction<String>) value -> isEntityType(value, type)) .filter((FilterFunction<String>) value -> isEntityType(value, type))
@ -73,14 +74,13 @@ public class DispatchEntitiesApplication {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(targetPath + "/" + type); .parquet(targetPath + "/" + type);
/* */
JavaSparkContext.fromSparkContext(spark.sparkContext()) JavaSparkContext.fromSparkContext(spark.sparkContext())
.textFile(sourcePath) .textFile(sourcePath)
.filter(l -> isEntityType(l, type)) .filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|")) .map(l -> StringUtils.substringAfter(l, "|"))
.saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ??? .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
*/
} }
private static boolean isEntityType(final String line, final String type) { private static boolean isEntityType(final String line, final String type) {

View File

@ -100,16 +100,24 @@ public class MergeClaimsApplication {
return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null; return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null;
}, Encoders.bean(clazz)) }, Encoders.bean(clazz))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map((MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING())
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outPath); .option("compression", "gzip")
.text(outPath);
} }
private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) { private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) {
return spark.read()
.textFile(path)
.map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
/*
return spark.read() return spark.read()
.load(path) .load(path)
.as(Encoders.bean(clazz)) .as(Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value))); .filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
*/
} }
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -10,6 +10,10 @@
<value>false</value> <value>false</value>
<description>should import content from the aggregator or reuse a previous version</description> <description>should import content from the aggregator or reuse a previous version</description>
</property> </property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property> <property>
<name>postgresURL</name> <name>postgresURL</name>
<description>the postgres URL to access to the database</description> <description>the postgres URL to access to the database</description>
@ -108,10 +112,10 @@
<action name="ImportDB_claims"> <action name="ImportDB_claims">
<java> <java>
<prepare> <prepare>
<delete path="${workingDir}/db_claims"/> <delete path="${contentPath}/db_claims"/>
</prepare> </prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class> <main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${workingDir}/db_claims</arg> <arg>-p</arg><arg>${contentPath}/db_claims</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
@ -124,10 +128,10 @@
<action name="ImportODF_claims"> <action name="ImportODF_claims">
<java> <java>
<prepare> <prepare>
<delete path="${workingDir}/odf_claims"/> <delete path="${contentPath}/odf_claims"/>
</prepare> </prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingDir}/odf_claims</arg> <arg>-p</arg><arg>${contentPath}/odf_claims</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg> <arg>-f</arg><arg>ODF</arg>
@ -141,10 +145,10 @@
<action name="ImportOAF_claims"> <action name="ImportOAF_claims">
<java> <java>
<prepare> <prepare>
<delete path="${workingDir}/oaf_claims"/> <delete path="${contentPath}/oaf_claims"/>
</prepare> </prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingDir}/oaf_claims</arg> <arg>-p</arg><arg>${contentPath}/oaf_claims</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg> <arg>-f</arg><arg>OAF</arg>
@ -158,10 +162,10 @@
<action name="ImportDB"> <action name="ImportDB">
<java> <java>
<prepare> <prepare>
<delete path="${workingDir}/db_records"/> <delete path="${contentPath}/db_records"/>
</prepare> </prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class> <main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${workingDir}/db_records</arg> <arg>-p</arg><arg>${contentPath}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
@ -173,10 +177,10 @@
<action name="ImportODF"> <action name="ImportODF">
<java> <java>
<prepare> <prepare>
<delete path="${workingDir}/odf_records"/> <delete path="${contentPath}/odf_records"/>
</prepare> </prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingDir}/odf_records</arg> <arg>-p</arg><arg>${contentPath}/odf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg> <arg>-f</arg><arg>ODF</arg>
@ -190,10 +194,10 @@
<action name="ImportOAF"> <action name="ImportOAF">
<java> <java>
<prepare> <prepare>
<delete path="${workingDir}/oaf_records"/> <delete path="${contentPath}/oaf_records"/>
</prepare> </prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class> <main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingDir}/oaf_records</arg> <arg>-p</arg><arg>${contentPath}/oaf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg> <arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg> <arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg> <arg>-f</arg><arg>OAF</arg>
@ -227,7 +231,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>-s</arg><arg>${workingDir}/db_claims,${workingDir}/oaf_claims,${workingDir}/odf_claims</arg> <arg>-s</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims</arg>
<arg>-t</arg><arg>${workingDir}/entities_claim</arg> <arg>-t</arg><arg>${workingDir}/entities_claim</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
@ -276,7 +280,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>-s</arg><arg>${workingDir}/db_records,${workingDir}/oaf_records,${workingDir}/odf_records</arg> <arg>-s</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>-t</arg><arg>${workingDir}/entities</arg> <arg>-t</arg><arg>${workingDir}/entities</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg> <arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>