hotfix: recovered implementation removing the hardcoded working_dirs

This commit is contained in:
Claudio Atzori 2021-10-19 12:35:38 +02:00
parent e15a1969a5
commit e471f12d5e
1 changed files with 18 additions and 21 deletions

View File

@ -52,8 +52,11 @@ public class CreateRelatedEntitiesJob_phase1 {
final String jsonConfiguration = IOUtils final String jsonConfiguration = IOUtils
.toString( .toString(
PrepareRelationsJob.class Objects
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); .requireNonNull(
CreateRelatedEntitiesJob_phase1.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
@ -75,6 +78,7 @@ public class CreateRelatedEntitiesJob_phase1 {
final String graphTableClassName = parser.get("graphTableClassName"); final String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName); log.info("graphTableClassName: {}", graphTableClassName);
@SuppressWarnings("unchecked")
final Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName); final Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -101,22 +105,12 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache(); .cache();
readPathEntity(spark, inputEntityPath, clazz) final Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false") .filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)), (MapFunction<E, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)),
Encoders Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) .cache();
.write()
.mode(SaveMode.Overwrite)
.save("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName());
final Dataset<Tuple2<String, RelatedEntity>> entities = spark
.read()
.load("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName())
.as(
Encoders
.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)));
relsByTarget relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
@ -149,8 +143,10 @@ public class CreateRelatedEntitiesJob_phase1 {
re.setId(entity.getId()); re.setId(entity.getId());
re.setType(EntityType.fromClass(clazz).name()); re.setType(EntityType.fromClass(clazz).name());
if (entity.getPid() != null) // TODO move the max number of PIDs to eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits
if (Objects.nonNull(entity.getPid())) {
re.setPid(entity.getPid().stream().limit(400).collect(Collectors.toList())); re.setPid(entity.getPid().stream().limit(400).collect(Collectors.toList()));
}
re.setCollectedfrom(entity.getCollectedfrom()); re.setCollectedfrom(entity.getCollectedfrom());
switch (EntityType.fromClass(clazz)) { switch (EntityType.fromClass(clazz)) {
@ -212,7 +208,7 @@ public class CreateRelatedEntitiesJob_phase1 {
final List<Field<String>> f = p.getFundingtree(); final List<Field<String>> f = p.getFundingtree();
if (!f.isEmpty()) { if (!f.isEmpty()) {
re.setFundingtree(f.stream().map(s -> s.getValue()).collect(Collectors.toList())); re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList()));
} }
break; break;
} }
@ -227,15 +223,16 @@ public class CreateRelatedEntitiesJob_phase1 {
return Optional return Optional
.ofNullable(f) .ofNullable(f)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(x -> x.getValue()) .map(Field::getValue)
.orElse(defaultValue); .orElse(defaultValue);
} }
/** /**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file
* *
* @param spark * @param spark the SparkSession
* @param relationPath * @param relationPath the path storing the relation objects
* @return the Dataset<SortableRelation> containing all the relationships * @return the Dataset<SortableRelation> containing all the relationships
*/ */
private static Dataset<Relation> readPathRelation( private static Dataset<Relation> readPathRelation(