fixed a bug with duplicated events

This commit is contained in:
Michele Artini 2020-07-07 15:37:13 +02:00
parent edf6c6c4dc
commit efadbdb2bc
9 changed files with 192 additions and 47 deletions

View File

@ -0,0 +1,112 @@
package eu.dnetlib.dhp.broker.oa;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import scala.Tuple2;
public class CheckDuplictedIdsJob {
private static final Logger log = LoggerFactory.getLogger(CheckDuplictedIdsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
CheckDuplictedIdsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String countPath = parser.get("workingPath") + "/counts";
log.info("countPath: {}", countPath);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final LongAccumulator total = spark.sparkContext().longAccumulator("invaild_event_id");
final TypedColumn<Tuple2<String, Long>, Tuple2<String, Long>> agg = new CountAggregator().toColumn();
ClusterUtils
.readPath(spark, eventsPath, Event.class)
.map(e -> new Tuple2<>(e.getEventId(), 1l), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
.groupByKey(t -> t._1, Encoders.STRING())
.agg(agg)
.map(t -> t._2, Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
.filter(t -> t._2 > 1)
.map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
.write()
.mode(SaveMode.Overwrite)
.json(countPath);
;
}
private static String eventAsJsonString(final Event f) throws JsonProcessingException {
return new ObjectMapper().writeValueAsString(f);
}
}
class CountAggregator extends Aggregator<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>> {
/**
*
*/
private static final long serialVersionUID = 1395935985734672538L;
@Override
public Encoder<Tuple2<String, Long>> bufferEncoder() {
return Encoders.tuple(Encoders.STRING(), Encoders.LONG());
}
@Override
public Tuple2<String, Long> finish(final Tuple2<String, Long> arg0) {
return arg0;
}
@Override
public Tuple2<String, Long> merge(final Tuple2<String, Long> arg0, final Tuple2<String, Long> arg1) {
final String s = StringUtils.defaultIfBlank(arg0._1, arg1._1);
return new Tuple2<>(s, arg0._2 + arg1._2);
}
@Override
public Encoder<Tuple2<String, Long>> outputEncoder() {
return Encoders.tuple(Encoders.STRING(), Encoders.LONG());
}
@Override
public Tuple2<String, Long> reduce(final Tuple2<String, Long> arg0, final Tuple2<String, Long> arg1) {
final String s = StringUtils.defaultIfBlank(arg0._1, arg1._1);
return new Tuple2<>(s, arg0._2 + arg1._2);
}
@Override
public Tuple2<String, Long> zero() {
return new Tuple2<>(null, 0l);
}
}

View File

@ -3,8 +3,6 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -63,13 +61,13 @@ public class GenerateEventsJob {
final String eventsPath = workingPath + "/events"; final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final Set<String> dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist"); final Set<String> dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist");
log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ",")); log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ","));
final Set<String> dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist"); final Set<String> dsTypeWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceTypeWhitelist");
log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ",")); log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ","));
final Set<String> dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist"); final Set<String> dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist");
log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ",")); log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ","));
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -103,22 +101,6 @@ public class GenerateEventsJob {
} }
private static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
final String s = parser.get(key).trim();
final Set<String> res = new HashSet<>();
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
Arrays
.stream(s.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.forEach(res::add);
}
return res;
}
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) { public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
return EventFinder return EventFinder

View File

@ -48,7 +48,7 @@ public class IndexOnESJob {
final JavaRDD<String> inputRdd = ClusterUtils final JavaRDD<String> inputRdd = ClusterUtils
.readPath(spark, eventsPath, Event.class) .readPath(spark, eventsPath, Event.class)
.limit(10000) // TODO REMOVE // .limit(10000) // TODO REMOVE
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) .map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD(); .javaRDD();

View File

@ -25,7 +25,9 @@ public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source, protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source,
final OaBrokerMainEntity target) { final OaBrokerMainEntity target) {
if (target.getProjects().size() >= BrokerConstants.MAX_LIST_SIZE) { return new ArrayList<>(); } if (target.getProjects().size() >= BrokerConstants.MAX_LIST_SIZE) {
return new ArrayList<>();
}
final Set<String> existingProjects = target final Set<String> existingProjects = target
.getProjects() .getProjects()

View File

@ -1,6 +1,11 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -10,6 +15,7 @@ import org.apache.spark.util.LongAccumulator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
public class ClusterUtils { public class ClusterUtils {
@ -53,7 +59,9 @@ public class ClusterUtils {
return o; return o;
} }
public static <T> void save(final Dataset<T> dataset, final String path, final Class<T> clazz, public static <T> void save(final Dataset<T> dataset,
final String path,
final Class<T> clazz,
final LongAccumulator acc) { final LongAccumulator acc) {
dataset dataset
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
@ -62,4 +70,20 @@ public class ClusterUtils {
.json(path); .json(path);
} }
public static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
final String s = parser.get(key).trim();
final Set<String> res = new HashSet<>();
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
Arrays
.stream(s.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.forEach(res::add);
}
return res;
}
} }

View File

@ -44,7 +44,9 @@ public class ConversionUtils {
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
public static List<OaBrokerInstance> oafInstanceToBrokerInstances(final Instance i) { public static List<OaBrokerInstance> oafInstanceToBrokerInstances(final Instance i) {
if (i == null) { return new ArrayList<>(); } if (i == null) {
return new ArrayList<>();
}
return mappedList(i.getUrl(), url -> { return mappedList(i.getUrl(), url -> {
final OaBrokerInstance res = new OaBrokerInstance(); final OaBrokerInstance res = new OaBrokerInstance();
@ -65,7 +67,9 @@ public class ConversionUtils {
} }
public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) { public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) {
if (d == null) { return null; } if (d == null) {
return null;
}
final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset(); final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset();
res.setOpenaireId(d.getId()); res.setOpenaireId(d.getId());
@ -78,7 +82,9 @@ public class ConversionUtils {
} }
public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) { public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) {
if (p == null) { return null; } if (p == null) {
return null;
}
final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication(); final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication();
res.setOpenaireId(p.getId()); res.setOpenaireId(p.getId());
@ -92,7 +98,9 @@ public class ConversionUtils {
} }
public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) { public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) {
if (result == null) { return null; } if (result == null) {
return null;
}
final OaBrokerMainEntity res = new OaBrokerMainEntity(); final OaBrokerMainEntity res = new OaBrokerMainEntity();
@ -109,7 +117,8 @@ public class ConversionUtils {
res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); res.setEmbargoenddate(fieldValue(result.getEmbargoenddate()));
res.setContributor(fieldList(result.getContributor())); res.setContributor(fieldList(result.getContributor()));
res res
.setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); .setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
@ -121,7 +130,9 @@ public class ConversionUtils {
} }
private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) { private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) {
if (author == null) { return null; } if (author == null) {
return null;
}
final String pids = author.getPid() != null ? author final String pids = author.getPid() != null ? author
.getPid() .getPid()
@ -145,7 +156,9 @@ public class ConversionUtils {
} }
private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) { private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) {
if (journal == null) { return null; } if (journal == null) {
return null;
}
final OaBrokerJournal res = new OaBrokerJournal(); final OaBrokerJournal res = new OaBrokerJournal();
res.setName(journal.getName()); res.setName(journal.getName());
@ -157,7 +170,9 @@ public class ConversionUtils {
} }
private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
if (ref == null) { return null; } if (ref == null) {
return null;
}
final OaBrokerExternalReference res = new OaBrokerExternalReference(); final OaBrokerExternalReference res = new OaBrokerExternalReference();
res.setRefidentifier(ref.getRefidentifier()); res.setRefidentifier(ref.getRefidentifier());
@ -168,7 +183,9 @@ public class ConversionUtils {
} }
public static final OaBrokerProject oafProjectToBrokerProject(final Project p) { public static final OaBrokerProject oafProjectToBrokerProject(final Project p) {
if (p == null) { return null; } if (p == null) {
return null;
}
final OaBrokerProject res = new OaBrokerProject(); final OaBrokerProject res = new OaBrokerProject();
res.setOpenaireId(p.getId()); res.setOpenaireId(p.getId());
@ -192,7 +209,9 @@ public class ConversionUtils {
} }
public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) { public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) {
if (sw == null) { return null; } if (sw == null) {
return null;
}
final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware(); final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware();
res.setOpenaireId(sw.getId()); res.setOpenaireId(sw.getId());
@ -255,7 +274,9 @@ public class ConversionUtils {
} }
private static List<OaBrokerTypedValue> structPropTypedList(final List<StructuredProperty> list) { private static List<OaBrokerTypedValue> structPropTypedList(final List<StructuredProperty> list) {
if (list == null) { return new ArrayList<>(); } if (list == null) {
return new ArrayList<>();
}
return list return list
.stream() .stream()
@ -265,7 +286,9 @@ public class ConversionUtils {
} }
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) { private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) {
if (list == null) { return new ArrayList<>(); } if (list == null) {
return new ArrayList<>();
}
return list return list
.stream() .stream()
@ -276,7 +299,9 @@ public class ConversionUtils {
} }
private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) { private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) {
if (list == null) { return new ArrayList<>(); } if (list == null) {
return new ArrayList<>();
}
return list return list
.stream() .stream()
@ -288,7 +313,9 @@ public class ConversionUtils {
} }
private static <F, T> T mappedFirst(final List<F> list, final Function<F, T> func) { private static <F, T> T mappedFirst(final List<F> list, final Function<F, T> func) {
if (list == null) { return null; } if (list == null) {
return null;
}
return list return list
.stream() .stream()

View File

@ -70,7 +70,6 @@ public class EventFinder {
matchers.add(new EnrichMissingDatasetReferences()); matchers.add(new EnrichMissingDatasetReferences());
matchers.add(new EnrichMissingDatasetIsSupplementedTo()); matchers.add(new EnrichMissingDatasetIsSupplementedTo());
matchers.add(new EnrichMissingDatasetIsSupplementedBy()); matchers.add(new EnrichMissingDatasetIsSupplementedBy());
matchers.add(new EnrichMissingAbstract());
} }
public static EventGroup generateEvents(final ResultGroup results, public static EventGroup generateEvents(final ResultGroup results,

View File

@ -447,7 +447,7 @@
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="2" --conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -73,33 +73,32 @@
</configuration> </configuration>
</global> </global>
<start to="index_es"/> <start to="count"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="index_es"> <action name="count">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>IndexOnESJob</name> <name>Count</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class> <class>eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="2"
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>