record linkage, a real use case with spark ml · df1.join(df2, (df1("id_1") < df2 ......

40
RECORD LINKAGE, A REAL USE CASE WITH SPARK ML Alexis Seigneurin

Upload: buidung

Post on 30-Apr-2018

223 views

Category:

Documents


2 download

TRANSCRIPT

Page 1: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

RECORD LINKAGE,A REAL USE CASE WITH SPARK ML

Alexis Seigneurin

Page 2: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Who I am

• Software engineer for 15 years

• Consultant at Ippon USA, previously at Ippon France

• Favorite subjects: Spark, Machine Learning, Cassandra

• Spark trainer

• @aseigneurin

Page 3: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• 200 software engineers in France and the US

• In the US: offices in DC, NYC and Richmond, Virginia

• Digital, Big Data and Cloud applications

• Java & Agile expertise

• Open-source projects: JHipster, Tatami, etc.

• @ipponusa

Page 4: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

The project

• Record Linkage with Machine learning

• Use cases:• Find new clients who come from insurance comparison services→ Commission

• Find duplicates in existing files (acquisitions)

• Record Linkage• Entity resolution• Deduplication• Entity disambiguation• …

Page 5: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Overview

Page 6: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Find duplicates!

Purpose

+---+-------+------------+----------+------------+---------+------------+ | ID| veh|codptgar_veh|dt_nais_cp|dt_permis_cp|redmaj_cp| formule| +---+-------+------------+----------+------------+---------+------------+ |...|PE28221| 50000|1995-10-12| 2013-10-08| 100.0| TIERS| |...|FO26016| 59270|1967-01-01| 1987-02-01| 100.0|VOL_INCENDIE| |...|FI19107| 77100|1988-09-27| 2009-09-13| 105.0|TOUS_RISQUES| |...|RE07307| 69100|1984-08-15| 2007-04-20| 50.0| TIERS| |...|FO26016| 59270|1967-01-07| 1987-02-01| 105.0|TOUS_RISQUES| +---+-------+------------+----------+------------+---------+------------+

Page 7: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Steps

1. Preprocessing1. Find potential duplicates2. Feature engineering

2. Manual labeling of a sample

3. Machine Learning to make predictions on the rest of the records

Page 8: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Prototype

• Crafted by a Data Scientist• Not architectured, not versioned, not unit tested…→ Not ready for production

• Spark, but a lot of Spark SQL (data processing)

• Machine Learning in Python (Scikit Learn)

→ Objective: industrialization of the code

Page 9: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Preprocessing

Page 10: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Data (CSV) + Schema (JSON)

Inputs

000010;Jose;Lester;10/10/1970 000011;José;Lester;10/10/1970 000012;Tyler;Hunt;12/12/1972 000013;Tiler;Hunt;25/12/1972 000014;Patrick;Andrews;1973-12-13

{ "tableSchemaBeforeSelection": [ { "name": "ID", "typeField": "StringType", "hardJoin": false }, { "name": "name", "typeField": "StringType", "hardJoin": true, "cleaning": "digitLetter", "listFeature": [ "scarcity" ], "listDistance": [ "equality", "soundLike" ] }, ...

Page 11: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Spark CSV module → DataFrameDon’t use type inference

Data loading

+------+-------+-------+----------+ | ID| name|surname| birthDt| +------+-------+-------+----------+ |000010| Jose| Lester|10/10/1970| |000011| José| Lester|10/10/1970| |000012| Tyler| Hunt|12/12/1972| |000013| Tiler| Hunt|25/12/1972| |000014|Patrick|Andrews|1970-10-10| +------+-------+-------+----------+

Page 12: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Parsing of dates, numbers…

• Cleaning of strings

Data cleansing

+------+-------+-------+----------+ | ID| name|surname| birthDt| +------+-------+-------+----------+ |000010| jose| lester|1970-10-10| |000011| jose| lester|1970-10-10| |000012| tyler| hunt|1972-12-12| |000013| tiler| hunt|1972-12-25| |000014|patrick|andrews| null| +------+-------+-------+----------+

Page 13: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Convert strings to phonetics (Beider-Morse)

• …

Feature calculation

+------+-------+-------+----------+--------------------+ | ID| name|surname| birthDt| BMencoded_name| +------+-------+-------+----------+--------------------+ |000010| jose| lester|1970-10-10|ios|iosi|ioz|iozi...| |000011| jose| lester|1970-10-10|ios|iosi|ioz|iozi...| |000012| tyler| hunt|1972-12-12| tilir| |000013| tiler| hunt|1972-12-25| tQlir|tili|tilir| |000014|patrick|andrews| null|pYtrQk|pYtrik|pat...| +------+-------+-------+----------+--------------------+

Page 14: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Auto-join (more on that later…)

Find potential duplicates

+------+------+---------+...+------+------+---------+... | ID_1|name_1|surname_1|...| ID_2|name_2|surname_2|... +------+------+---------+...+------+------+---------+... |000010| jose| lester|...|000011| jose| lester|... |000012| tyler| hunt|...|000013| tiler| hunt|... +------+------+---------+...+------+------+---------+...

Page 15: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Several distance algorithms:• Levenshtein distance, date difference…

Distance calculation

+------+...+------+...+-------------+--------------+...+----------------+ | ID_1|...| ID_2|...|equality_name|soundLike_name|...|dateDiff_birthDt| +------+...+------+...+-------------+--------------+...+----------------+ |000010|...|000011|...| 0.0| 0.0|...| 0.0| |000012|...|000013|...| 1.0| 0.0|...| 13.0| +------+...+------+...+-------------+--------------+...+----------------+

Page 16: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• Standardization of distances only

• Vectorization (2 vectors)

Standardization / vectorization

+------+------+---------+----------+------+------+---------+----------+------------+--------------+ | ID_1|name_1|surname_1| birthDt_1| ID_2|name_2|surname_2| birthDt_2| distances|other_features| +------+------+---------+----------+------+------+---------+----------+------------+--------------+ |000010| jose| lester|1970-10-10|000011| jose| lester|1970-10-10|[0.0,0.0,...| [2.0,2.0,...| |000012| tyler| hunt|1972-12-12|000013| tiler| hunt|1972-12-25|[1.0,1.0,...| [1.0,2.0,...| +------+------+---------+----------+------+------+---------+----------+------------+--------------+

Page 17: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

SparkSQL → DataFrames

Page 18: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

From SQL…• Generated SQL requests

• Hard to maintain (especially as regards to UDFs)

val cleaningRequest = tableSchema.map(x => { x.CleaningFuction match { case (Some(a), _) => a + "(" + x.name + ") as " + x.name case _ => x.name } }).mkString(", ")

val cleanedTable = sqlContext.sql("select " + cleaningRequest + " from " + tableName) cleanedTable.registerTempTable(schema.tableName + "_cleaned")

Page 19: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

… to DataFrames• DataFrame primitives

• More work done by the Scala compiler

val cleanedDF = tableSchema.filter(_.cleaning.isDefined).foldLeft(df) { case (df, field) => val udf: UserDefinedFunction = ... // get the cleaning UDF df.withColumn(field.name + "_cleaned", udf.apply(df(field.name))) .drop(field.name) .withColumnRenamed(field.name + "_cleaned", field.name) }

Page 20: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Unit testing

Page 21: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Unit testing

• Scalatest + Scoverage

• Coverage of all the data processing operations

Page 22: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

val resDF = schema.cleanTable(rows)

"The cleaning process" should "clean text fields" in { val res = resDF.select("ID", "name", "surname").collect() val expected = Array( Row("000010", "jose", "lester"), Row("000011", "jose", "lester ea"), Row("000012", "jose", "lester") ) res should contain theSameElementsAs expected} "The cleaning process" should "parse dates" in { ...

Comparison of Row objects000010;Jose;Lester;10/10/1970000011;Jose =-+;Lester éà;10/10/1970000012;Jose;Lester;invalid date

Page 23: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Shared SparkContext• Don’ts:

• Use one SparkContext per class of tests → multiple contexts• Setup / tear down the SparkContext for each test → slow tests

• Do’s:• Use a shared SparkContext

object SparkTestContext { val conf = new SparkConf() .setAppName("deduplication-tests") .setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) }

Page 24: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

spark-testing-base

• Holden Karau’s spark-testing-base library• https://github.com/holdenk/spark-testing-base

• Provides:• Shared SparkContext and SQLContext• Comparison of RDDs, DataFrames, DataSets• Mock data generators

Page 25: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Matching potential duplicates

Page 26: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Join strategy

• For record linkage, first merge the two sources

• Then auto-join

Prospects New clients

Duplicate

Page 27: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Join - Volume of data

• Input: 1M records

• Cartesian product: 1000 B records

→ Find an appropriate join condition0

25

50

75

100

Page 28: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Join condition• Multiples join on 2 fields

• Equality of values or custom condition (UDF)• Union between all the intermediate results

• E.g. with fields name, surname, birth_date:df1.join(df2, (df1("ID_1") < df2("ID_2")) && (df1("name_1") === df2("name_2")) && (soundLike(df1("surname_1"), df2("surname_2")))

df1.join(df2, (df1("ID_1") < df2("ID_2")) && (df1("name_1") === df2("name_2")) && (df1("birth_date_1") === df2("birth_date_2")))

df1.join(df2, (df1("ID_1") < df2("ID_2")) && (soundLike(df1("surname_1"), df2("surname_2"))) && (df1("birth_date_1") === df2("birth_date_2")))

UNION

Page 29: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

DataFrames extension

Page 30: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• 3 types of columns

DataFrames extension

+------+...+------+...+-------------+--------------+...+----------------+ | ID_1|...| ID_2|...|equality_name|soundLike_name|...|dateDiff_birthDt| +------+...+------+...+-------------+--------------+...+----------------+ |000010|...|000011|...| 0.0| 0.0|...| 0.0| |000012|...|000013|...| 1.0| 0.0|...| 13.0| +------+...+------+...+-------------+--------------+...+----------------+

Data DistancesNon-distance features

Page 31: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

• DataFrame columns have a name and a data type• DataFrameExt = DataFrame + metadata over columns

DataFrames extension

case class OutputColumn(name: String, columnType: ColumnType) class DataFrameExt(val df: DataFrame, val outputColumns: Seq[OutputColumn]) {

def show() = df.show()

def drop(colName: String): DataFrameExt = ...

def withColumn(colName: String, col: Column, columnType: ColumnType): DataFrameExt = ...

...

Page 32: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Labeling

Page 33: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Labeling

• Manual operation• Is this a duplicate? → Yes / No

• Performed on a sample of the potential duplicates• Between 1000 and 10 000 records

Page 34: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Labeling

Page 35: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Predictions

Page 36: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Predictions

• Machine Learning• Random Forests• (Gradient Boosting Trees also give good results)

• Training on the potential duplicates labeled by hand

• Predictions on the potential duplicates not labeled by hand

Page 37: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Predictions

• Sample: 1000 records• Training set: 800 records• Test set: 200 records

• Results• True positives: 53 • False positives: 2 • True negatives: 126• False negatives: 5

→ Found 53 duplicates on the 58 expected (53+5) and only 2 errors

•Precision ≈ 93%

•Recall ≈ 91%

Page 38: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Summary&

Conclusion

Page 39: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Summary

✓ Single engine for Record Linkage and Deduplication

✓ Machine Learning → Specific rules for each dataset

✓ Higher identification of matches• Previously ~50% → Now ~90%

Page 40: Record Linkage, a real use case with Spark ML · df1.join(df2, (df1("ID_1") < df2 ... • Performed on a sample of the potential duplicates ... Record Linkage, a real use case with

Thank you!@aseigneurin