2018-10-18 11:56:26 +02:00
package eu.dnetlib.pace.util ;
2018-10-02 17:07:17 +02:00
import com.google.common.collect.Lists ;
import eu.dnetlib.pace.clustering.NGramUtils ;
import eu.dnetlib.pace.config.DedupConfig ;
import eu.dnetlib.pace.config.WfConfig ;
import eu.dnetlib.pace.distance.PaceDocumentDistance ;
import eu.dnetlib.pace.distance.eval.ScoreResult ;
import eu.dnetlib.pace.model.Field ;
import eu.dnetlib.pace.model.MapDocument ;
import eu.dnetlib.pace.model.MapDocumentComparator ;
import org.apache.commons.lang.StringUtils ;
import org.apache.commons.logging.Log ;
import org.apache.commons.logging.LogFactory ;
import java.util.* ;
public class BlockProcessor {
2018-10-11 15:19:20 +02:00
public static final List < String > accumulators = new ArrayList < > ( ) ;
2018-10-02 17:07:17 +02:00
private static final Log log = LogFactory . getLog ( BlockProcessor . class ) ;
private DedupConfig dedupConf ;
2018-10-11 15:19:20 +02:00
public static void constructAccumulator ( final DedupConfig dedupConf ) {
accumulators . add ( String . format ( " %s::%s " , dedupConf . getWf ( ) . getEntityType ( ) , " records per hash key = 1 " ) ) ;
accumulators . add ( String . format ( " %s::%s " , dedupConf . getWf ( ) . getEntityType ( ) , " missing " + dedupConf . getWf ( ) . getOrderField ( ) ) ) ;
accumulators . add ( String . format ( " %s::%s " , dedupConf . getWf ( ) . getEntityType ( ) , String . format ( " Skipped records for count(%s) >= %s " , dedupConf . getWf ( ) . getOrderField ( ) , dedupConf . getWf ( ) . getGroupMaxSize ( ) ) ) ) ;
accumulators . add ( String . format ( " %s::%s " , dedupConf . getWf ( ) . getEntityType ( ) , " skip list " ) ) ;
accumulators . add ( String . format ( " %s::%s " , dedupConf . getWf ( ) . getEntityType ( ) , " dedupSimilarity (x2) " ) ) ;
accumulators . add ( String . format ( " %s::%s " , dedupConf . getWf ( ) . getEntityType ( ) , " d < " + dedupConf . getWf ( ) . getThreshold ( ) ) ) ;
}
2018-10-02 17:07:17 +02:00
public BlockProcessor ( DedupConfig dedupConf ) {
this . dedupConf = dedupConf ;
}
2018-10-11 15:19:20 +02:00
public void process ( final String key , final Iterable < MapDocument > documents , final Reporter context ) {
2018-10-02 17:07:17 +02:00
final Queue < MapDocument > q = prepare ( documents ) ;
if ( q . size ( ) > 1 ) {
2018-11-16 16:11:03 +01:00
// log.info("reducing key: '" + key + "' records: " + q.size());
2018-10-02 17:07:17 +02:00
//process(q, context);
2018-10-11 15:19:20 +02:00
process ( simplifyQueue ( q , key , context ) , context ) ;
2018-10-02 17:07:17 +02:00
} else {
context . incrementCounter ( dedupConf . getWf ( ) . getEntityType ( ) , " records per hash key = 1 " , 1 ) ;
}
}
2018-10-11 15:19:20 +02:00
private Queue < MapDocument > prepare ( final Iterable < MapDocument > documents ) {
final Queue < MapDocument > queue = new PriorityQueue < > ( 100 , new MapDocumentComparator ( dedupConf . getWf ( ) . getOrderField ( ) ) ) ;
2018-10-02 17:07:17 +02:00
final Set < String > seen = new HashSet < String > ( ) ;
final int queueMaxSize = dedupConf . getWf ( ) . getQueueMaxSize ( ) ;
documents . forEach ( doc - > {
if ( queue . size ( ) < = queueMaxSize ) {
final String id = doc . getIdentifier ( ) ;
if ( ! seen . contains ( id ) ) {
seen . add ( id ) ;
queue . add ( doc ) ;
}
}
} ) ;
return queue ;
}
private Queue < MapDocument > simplifyQueue ( final Queue < MapDocument > queue , final String ngram , final Reporter context ) {
2018-10-11 15:19:20 +02:00
final Queue < MapDocument > q = new LinkedList < > ( ) ;
2018-10-02 17:07:17 +02:00
String fieldRef = " " ;
final List < MapDocument > tempResults = Lists . newArrayList ( ) ;
while ( ! queue . isEmpty ( ) ) {
final MapDocument result = queue . remove ( ) ;
final String orderFieldName = dedupConf . getWf ( ) . getOrderField ( ) ;
final Field orderFieldValue = result . values ( orderFieldName ) ;
if ( ! orderFieldValue . isEmpty ( ) ) {
final String field = NGramUtils . cleanupForOrdering ( orderFieldValue . stringValue ( ) ) ;
if ( field . equals ( fieldRef ) ) {
tempResults . add ( result ) ;
} else {
populateSimplifiedQueue ( q , tempResults , context , fieldRef , ngram ) ;
tempResults . clear ( ) ;
tempResults . add ( result ) ;
fieldRef = field ;
}
} else {
context . incrementCounter ( dedupConf . getWf ( ) . getEntityType ( ) , " missing " + dedupConf . getWf ( ) . getOrderField ( ) , 1 ) ;
}
}
populateSimplifiedQueue ( q , tempResults , context , fieldRef , ngram ) ;
return q ;
}
private void populateSimplifiedQueue ( final Queue < MapDocument > q ,
final List < MapDocument > tempResults ,
final Reporter context ,
final String fieldRef ,
final String ngram ) {
WfConfig wf = dedupConf . getWf ( ) ;
if ( tempResults . size ( ) < wf . getGroupMaxSize ( ) ) {
q . addAll ( tempResults ) ;
} else {
context . incrementCounter ( wf . getEntityType ( ) , String . format ( " Skipped records for count(%s) >= %s " , wf . getOrderField ( ) , wf . getGroupMaxSize ( ) ) , tempResults . size ( ) ) ;
2018-11-16 16:11:03 +01:00
// log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
2018-10-02 17:07:17 +02:00
}
}
2018-10-11 15:19:20 +02:00
private void process ( final Queue < MapDocument > queue , final Reporter context ) {
2018-10-02 17:07:17 +02:00
final PaceDocumentDistance algo = new PaceDocumentDistance ( ) ;
while ( ! queue . isEmpty ( ) ) {
final MapDocument pivot = queue . remove ( ) ;
final String idPivot = pivot . getIdentifier ( ) ;
WfConfig wf = dedupConf . getWf ( ) ;
final Field fieldsPivot = pivot . values ( wf . getOrderField ( ) ) ;
final String fieldPivot = ( fieldsPivot = = null ) | | fieldsPivot . isEmpty ( ) ? null : fieldsPivot . stringValue ( ) ;
if ( fieldPivot ! = null ) {
// System.out.println(idPivot + " --> " + fieldPivot);
int i = 0 ;
for ( final MapDocument curr : queue ) {
final String idCurr = curr . getIdentifier ( ) ;
if ( mustSkip ( idCurr ) ) {
context . incrementCounter ( wf . getEntityType ( ) , " skip list " , 1 ) ;
break ;
}
if ( i > wf . getSlidingWindowSize ( ) ) {
break ;
}
final Field fieldsCurr = curr . values ( wf . getOrderField ( ) ) ;
final String fieldCurr = ( fieldsCurr = = null ) | | fieldsCurr . isEmpty ( ) ? null : fieldsCurr . stringValue ( ) ;
if ( ! idCurr . equals ( idPivot ) & & ( fieldCurr ! = null ) ) {
final ScoreResult sr = similarity ( algo , pivot , curr ) ;
2018-11-16 16:11:03 +01:00
// log.info(sr.toString()+"SCORE "+ sr.getScore());
2018-10-11 15:19:20 +02:00
emitOutput ( sr , idPivot , idCurr , context ) ;
2018-10-02 17:07:17 +02:00
i + + ;
}
}
}
}
}
2018-10-11 15:19:20 +02:00
private void emitOutput ( final ScoreResult sr , final String idPivot , final String idCurr , final Reporter context ) {
2018-10-02 17:07:17 +02:00
final double d = sr . getScore ( ) ;
if ( d > = dedupConf . getWf ( ) . getThreshold ( ) ) {
2018-10-11 15:19:20 +02:00
writeSimilarity ( context , idPivot , idCurr ) ;
2018-10-02 17:07:17 +02:00
context . incrementCounter ( dedupConf . getWf ( ) . getEntityType ( ) , " dedupSimilarity (x2) " , 1 ) ;
} else {
context . incrementCounter ( dedupConf . getWf ( ) . getEntityType ( ) , " d < " + dedupConf . getWf ( ) . getThreshold ( ) , 1 ) ;
}
}
private ScoreResult similarity ( final PaceDocumentDistance algo , final MapDocument a , final MapDocument b ) {
try {
return algo . between ( a , b , dedupConf ) ;
} catch ( Throwable e ) {
log . error ( String . format ( " \ nA: %s \ n---------------------- \ nB: %s " , a , b ) , e ) ;
throw new IllegalArgumentException ( e ) ;
}
}
private boolean mustSkip ( final String idPivot ) {
return dedupConf . getWf ( ) . getSkipList ( ) . contains ( getNsPrefix ( idPivot ) ) ;
}
private String getNsPrefix ( final String id ) {
return StringUtils . substringBetween ( id , " | " , " :: " ) ;
}
2018-10-11 15:19:20 +02:00
private void writeSimilarity ( final Reporter context , final String from , final String to ) {
final String type = dedupConf . getWf ( ) . getEntityType ( ) ;
context . emit ( type , from , to ) ;
context . emit ( type , to , from ) ;
2018-10-02 17:07:17 +02:00
}
}