dataset based provision WIP

This commit is contained in:
Claudio Atzori 2020-04-01 19:07:30 +02:00
parent 1402eb1fe7
commit 9c7092416a
5 changed files with 32 additions and 40 deletions

View File

@ -45,19 +45,12 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntit
*/ */
public class GraphJoiner_v2 implements Serializable { public class GraphJoiner_v2 implements Serializable {
public static final int LIMIT = 1000000;
private Map<String, LongAccumulator> accumulators = Maps.newHashMap(); private Map<String, LongAccumulator> accumulators = Maps.newHashMap();
public static final int MAX_RELS = 100; public static final int MAX_RELS = 100;
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
private static final StructType KV_SCHEMA = StructType$.MODULE$.apply(
Arrays.asList(
StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()),
StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
));
private SparkSession spark; private SparkSession spark;
private ContextMapper contextMapper; private ContextMapper contextMapper;
@ -105,7 +98,6 @@ public class GraphJoiner_v2 implements Serializable {
value.getId(), value.getId(),
value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
.limit(LIMIT)
.cache(); .cache();
System.out.println("Entities schema:"); System.out.println("Entities schema:");
@ -115,7 +107,6 @@ public class GraphJoiner_v2 implements Serializable {
Dataset<Relation> rels = readPathRelation(jsc, getInputPath()) Dataset<Relation> rels = readPathRelation(jsc, getInputPath())
.groupByKey((MapFunction<Relation, SortableRelationKey>) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) .groupByKey((MapFunction<Relation, SortableRelationKey>) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class))
.flatMapGroups((FlatMapGroupsFunction<SortableRelationKey, Relation, Relation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class)) .flatMapGroups((FlatMapGroupsFunction<SortableRelationKey, Relation, Relation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class))
.limit(LIMIT)
.cache(); .cache();
System.out.println("Relation schema:"); System.out.println("Relation schema:");
@ -169,7 +160,6 @@ public class GraphJoiner_v2 implements Serializable {
final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId);
grouped grouped
.map((MapFunction<JoinedEntity, String>) value -> recordFactory.build(value), Encoders.STRING()) .map((MapFunction<JoinedEntity, String>) value -> recordFactory.build(value), Encoders.STRING())
.limit(LIMIT)
.write() .write()
.text(getOutPath() + "/xml"); .text(getOutPath() + "/xml");
/* /*
@ -245,13 +235,11 @@ public class GraphJoiner_v2 implements Serializable {
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier * @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
*/ */
private Dataset<TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { private Dataset<TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
RDD<Row> rdd = sc.textFile(inputPath + "/" + type) RDD<String> rdd = sc.textFile(inputPath + "/" + type)
.map((Function<String, Row>) s -> RowFactory.create("", s))
.rdd(); .rdd();
return getSpark().createDataFrame(rdd, KV_SCHEMA) return getSpark().createDataset(rdd, Encoders.STRING())
.map((MapFunction<Row, TypedRow>) row -> { .map((MapFunction<String, TypedRow>) s -> {
final String s = row.getAs("value");
final DocumentContext json = JsonPath.parse(s); final DocumentContext json = JsonPath.parse(s);
final TypedRow t = new TypedRow(); final TypedRow t = new TypedRow();
t.setId(json.read("$.id")); t.setId(json.read("$.id"));
@ -270,12 +258,11 @@ public class GraphJoiner_v2 implements Serializable {
* @return the JavaRDD<TypedRow> containing all the relationships * @return the JavaRDD<TypedRow> containing all the relationships
*/ */
private Dataset<Relation> readPathRelation(final JavaSparkContext sc, final String inputPath) { private Dataset<Relation> readPathRelation(final JavaSparkContext sc, final String inputPath) {
final RDD<Row> rdd = sc.textFile(inputPath + "/relation") final RDD<String> rdd = sc.textFile(inputPath + "/relation")
.map((Function<String, Row>) s -> RowFactory.create("", s))
.rdd(); .rdd();
return getSpark().createDataFrame(rdd, KV_SCHEMA) return getSpark().createDataset(rdd, Encoders.STRING())
.map((MapFunction<Row, Relation>) value -> new ObjectMapper().readValue(value.<String>getAs("value"), Relation.class), Encoders.bean(Relation.class)); .map((MapFunction<String, Relation>) s -> new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class));
} }
private ObjectMapper getObjectMapper() { private ObjectMapper getObjectMapper() {

View File

@ -17,23 +17,23 @@ public class SparkXmlRecordBuilderJob_v2 {
SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
parser.parseArgument(args); parser.parseArgument(args);
final String master = parser.get("master"); try(SparkSession spark = getSession(parser)) {
try(SparkSession spark = getSession(master)) {
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
final String isLookupUrl = parser.get("isLookupUrl"); final String isLookupUrl = parser.get("isLookupUrl");
final String otherDsTypeId = parser.get("otherDsTypeId"); final String otherDsTypeId = parser.get("otherDsTypeId");
new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath)
.adjacencyLists(); .adjacencyLists();
} }
} }
private static SparkSession getSession(String master) { private static SparkSession getSession(ArgumentApplicationParser parser) {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.sql.shuffle.partitions", "500"); conf.set("spark.sql.shuffle.partitions", parser.get("sparkSqlShufflePartitions"));
conf.registerKryoClasses(new Class[]{ conf.registerKryoClasses(new Class[]{
Author.class, Author.class,
Context.class, Context.class,
@ -74,7 +74,7 @@ public class SparkXmlRecordBuilderJob_v2 {
.builder() .builder()
.config(conf) .config(conf)
.appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName()) .appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName())
.master(master) .master(parser.get("master"))
.getOrCreate(); .getOrCreate();
} }

View File

@ -3,5 +3,6 @@
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true} {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true},
{"paramName":"sp", "paramLongName":"sparkSqlShufflePartitions", "paramDescription": "Configures the number of partitions to use when shuffling data for joins or aggregations", "paramRequired": true}
] ]

View File

@ -19,13 +19,9 @@
<name>hive_metastore_uris</name> <name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value> <value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property> </property>
<property>
<name>hive_db_name</name>
<value>openaire</value>
</property>
<property> <property>
<name>spark2YarnHistoryServerAddress</name> <name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088</value> <value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property> </property>
<property> <property>
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>

View File

@ -2,19 +2,27 @@
<parameters> <parameters>
<property> <property>
<name>hive_db_name</name> <name>sparkDriverMemoryForJoining</name>
<description>the target hive database name</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
</property> </property>
<property> <property>
<name>sparkExecutorMemory</name> <name>sparkExecutorMemoryForJoining</name>
<description>memory for individual executor</description> <description>memory for individual executor</description>
</property> </property>
<property> <property>
<name>sparkExecutorCores</name> <name>sparkExecutorCoresForJoining</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>sparkDriverMemoryForIndexing</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemoryForIndexing</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCoresForIndexing</name>
<description>number of cores used by single executor</description> <description>number of cores used by single executor</description>
</property> </property>
<property> <property>
@ -75,13 +83,13 @@
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.network.timeout=10000000
</spark-opts> </spark-opts>
<arg>-mt</arg> <arg>yarn</arg> <arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg> <arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>-t</arg> <arg>${otherDsTypeId}</arg> <arg>-t</arg> <arg>${otherDsTypeId}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>-s</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>-o</arg><arg>${outputPath}</arg>
<arg>-sp</arg><arg>${sparkSqlShufflePartitions}</arg>
</spark> </spark>
<ok to="to_solr_index"/> <ok to="to_solr_index"/>
<error to="Kill"/> <error to="Kill"/>