1
0
Fork 0

reduced max number of PID in Relatedentity

This commit is contained in:
Sandro La Bruzzo 2021-09-02 14:21:24 +02:00
parent 9f8a80deb7
commit d4dadf6d77
1 changed files with 13 additions and 3 deletions

View File

@ -101,13 +101,22 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache();
final Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false")
.map(
(MapFunction<E, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)),
Encoders
.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
.cache();
.write()
.mode(SaveMode.Overwrite)
.save("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName());
final Dataset<Tuple2<String, RelatedEntity>> entities = spark
.read()
.load("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName())
.as(
Encoders
.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)));
relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
@ -140,7 +149,8 @@ public class CreateRelatedEntitiesJob_phase1 {
re.setId(entity.getId());
re.setType(EntityType.fromClass(clazz).name());
re.setPid(entity.getPid());
if (entity.getPid() != null)
re.setPid(entity.getPid().stream().limit(400).collect(Collectors.toList()));
re.setCollectedfrom(entity.getCollectedfrom());
switch (EntityType.fromClass(clazz)) {