dnet-hadoop/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java

51 lines
1.2 KiB
Java
Raw Normal View History

package eu.dnetlib.dhp.oa.dedup;
2019-12-06 13:38:00 +01:00
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.pace.util.Reporter;
import scala.Serializable;
import scala.Tuple2;
2019-12-06 13:38:00 +01:00
public class SparkReporter implements Serializable, Reporter {
private final List<Tuple2<String, String>> relations = new ArrayList<>();
2020-04-29 19:09:07 +02:00
private final Map<String, LongAccumulator> accumulators;
2019-12-06 13:38:00 +01:00
public SparkReporter(Map<String, LongAccumulator> accumulators) {
this.accumulators = accumulators;
}
2019-12-06 13:38:00 +01:00
public void incrementCounter(
String counterGroup,
String counterName,
long delta,
Map<String, LongAccumulator> accumulators) {
2019-12-06 13:38:00 +01:00
final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
if (accumulators.containsKey(accumulatorName)) {
accumulators.get(accumulatorName).add(delta);
}
}
2019-12-06 13:38:00 +01:00
@Override
public void incrementCounter(String counterGroup, String counterName, long delta) {
2019-12-06 13:38:00 +01:00
incrementCounter(counterGroup, counterName, delta, accumulators);
}
2019-12-06 13:38:00 +01:00
@Override
public void emit(String type, String from, String to) {
relations.add(new Tuple2<>(from, to));
}
2019-12-06 13:38:00 +01:00
public List<Tuple2<String, String>> getRelations() {
return relations;
}
2019-12-06 13:38:00 +01:00
}