This commit is contained in:
Michele Artini 2020-07-10 16:12:08 +02:00
parent 2d742a84ae
commit e1ae964bc4
7 changed files with 194 additions and 12 deletions

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
public class GenerateStatsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexOnESJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String statsPath = parser.get("workingPath") + "/stats";
log.info("stats: {}", statsPath);
final TypedColumn<Event, DatasourceStats> aggr = new StatsAggregator().toColumn();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<DatasourceStats> stats = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(DatasourceStats.class));
ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
});
}
}

View File

@ -7,7 +7,6 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
@ -65,9 +64,7 @@ public class JoinStep2Job {
final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
Encoders.STRING())
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class DatasourceStats implements Serializable {
/**
*
*/
private static final long serialVersionUID = -282112564184047677L;
private String id;
private String name;
private String type;
private Map<String, Long> topics = new HashMap<>();
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public Map<String, Long> getTopics() {
return topics;
}
public void setTopics(final Map<String, Long> topics) {
this.topics = topics;
}
public void incrementTopic(final String topic, final long inc) {
if (topics.containsKey(topic)) {
topics.put(topic, topics.get(topic) + inc);
} else {
topics.put(topic, inc);
}
}
}

View File

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.broker.model.Event;
public class StatsAggregator extends Aggregator<Event, DatasourceStats, DatasourceStats> {
/**
*
*/
private static final long serialVersionUID = 6652105853037330529L;
@Override
public DatasourceStats zero() {
return new DatasourceStats();
}
@Override
public DatasourceStats reduce(final DatasourceStats stats, final Event e) {
stats.setId(e.getMap().getTargetDatasourceId());
stats.setName(e.getMap().getTargetDatasourceName());
stats.setType(e.getMap().getTargetDatasourceType());
stats.incrementTopic(e.getTopic(), 1l);
return stats;
}
@Override
public DatasourceStats merge(final DatasourceStats stats0, final DatasourceStats stats1) {
if (StringUtils.isBlank(stats0.getId())) {
stats0.setId(stats1.getId());
stats0.setName(stats1.getName());
stats0.setType(stats1.getType());
}
stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue()));
return stats0;
}
@Override
public Encoder<DatasourceStats> bufferEncoder() {
return Encoders.bean(DatasourceStats.class);
}
@Override
public DatasourceStats finish(final DatasourceStats stats) {
return stats;
}
@Override
public Encoder<DatasourceStats> outputEncoder() {
return Encoders.bean(DatasourceStats.class);
}
}

View File

@ -64,19 +64,19 @@
</configuration>
</global>
<start to="count"/>
<start to="stats"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="count">
<action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Count</name>
<class>eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob</class>
<name>GenerateStatsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}

View File

@ -47,7 +47,8 @@ public class EntityMergerTest implements Serializable {
@Test
public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
List<Tuple2<String, Software>> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class);
List<Tuple2<String, Software>> softwares = readSample(
testEntityBasePath + "/software_merge.json", Software.class);
Software merged = DedupRecordFactory
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);

View File

@ -8,7 +8,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -24,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -151,7 +151,8 @@ public class CleanGraphSparkJob {
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
r.setBestaccessright(
r
.setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);