Skip to content

Commit fb01e01

Browse files
committed
Add option to enable current stats updates. Updated code to allow
multiple init values by specifying init values in raw data separated by |||.
1 parent bc73e7d commit fb01e01

16 files changed

Lines changed: 272 additions & 145 deletions

dataset/dataset.py

Lines changed: 81 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import logging
2-
import time
31
from enum import Enum
2+
import logging
43
import os
5-
import time
64
import pandas as pd
5+
import time
76

87
from .dbengine import DBengine
98
from .table import Table, Source
@@ -111,7 +110,6 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None):
111110
df.fillna('_nan_', inplace=True)
112111

113112
# Call to store to database
114-
115113
self.raw_data.store_to_db(self.engine.engine)
116114
status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath))
117115

@@ -236,14 +234,14 @@ def get_statistics(self):
236234
<count>: frequency (# of entities) where attr1: val1 AND attr2: val2
237235
"""
238236
if not self.stats_ready:
239-
self.collect_stats()
237+
self.collect_init_stats()
240238
stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats)
241239
self.stats_ready = True
242240
return stats
243241

244-
def collect_stats(self):
242+
def collect_init_stats(self):
245243
"""
246-
collect_stats calculates and memoizes: (based on current statistics)
244+
collect_init_stats calculates and memoizes: (based on RAW/INITIAL data)
247245
1. self.single_attr_stats ({ attribute -> { value -> count } })
248246
the frequency (# of entities) of a given attribute-value
249247
2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } })
@@ -257,77 +255,102 @@ def collect_stats(self):
257255
self.total_tuples = self.get_raw_data()['_tid_'].nunique()
258256
# Single attribute-value frequency
259257
for attr in self.get_attributes():
260-
self.single_attr_stats[attr] = self._get_stats_single(attr)
258+
self.single_attr_stats[attr] = self._get_init_stats_single(attr)
261259
# Co-occurence frequency
262-
for cond_attr in self.get_attributes():
263-
self.pair_attr_stats[cond_attr] = {}
264-
for trg_attr in self.get_attributes():
265-
if trg_attr != cond_attr:
266-
self.pair_attr_stats[cond_attr][trg_attr] = self._get_stats_pair(cond_attr,trg_attr)
260+
for first_attr in self.get_attributes():
261+
self.pair_attr_stats[first_attr] = {}
262+
for second_attr in self.get_attributes():
263+
if second_attr != first_attr:
264+
self.pair_attr_stats[first_attr][second_attr] = self._get_init_stats_pair(first_attr,second_attr)
267265

268-
def _get_stats_single(self, attr):
266+
def collect_current_stats(self, attr):
269267
"""
270-
Returns a dictionary where the keys possible values for :param attr: and
271-
the values contain the frequency count of that value for this attribute.
272-
"""
273-
<<<<<<< HEAD
274-
# need to decode values into unicode strings since we do lookups via
275-
# unicode strings from Postgres
276-
return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict()
277-
=======
268+
collect_current_stats calculates and memoizes frequency and co-occurence
269+
statistics based on the CURRENT values/data.
278270
279-
# If cell_domain has not been initialized yet, retrieve statistics
280-
# from raw data (this happens when the domain is just being setup)
281-
if not self.aux_table_exists(AuxTables.cell_domain):
282-
return self.get_raw_data()[[attr]].groupby([attr]).size()
271+
See collect_init_stats for which member variables are memoized/overwritten.
272+
Does NOT overwrite self.total_tuples.
273+
"""
274+
# Single attribute-value frequency
275+
for attr in self.get_attributes():
276+
self.single_attr_stats[attr] = self._get_current_stats_single(attr)
277+
# Co-occurence frequency
278+
for first_attr in self.get_attributes():
279+
self.pair_attr_stats[first_attr] = {}
280+
for second_attr in self.get_attributes():
281+
if second_attr != first_attr:
282+
self.pair_attr_stats[first_attr][second_attr] = self._get_current_stats_pair(first_attr,second_attr)
283283

284+
def _get_init_stats_single(self, attr):
285+
"""
286+
_get_init_stats_single returns a dictionary where the keys possible
287+
values for :param attr: and the values contain the frequency count in
288+
the RAW/INITIAL data of that value for this attribute.
289+
"""
290+
# We need to iterate through this in a for loop instead of groupby & size
291+
# since our values may be '|||' separated
292+
freq_count = {}
293+
for (vals,) in self.get_raw_data()[[attr]].itertuples(index=False):
294+
for val in vals.split('|||'):
295+
freq_count[val] = freq_count.get(val, 0) + 1
296+
return freq_count
297+
298+
def _get_current_stats_single(self, attr):
299+
"""
300+
_get_current_stats_single a dictionary where the keys possible values
301+
for :param attr: and the values contain the frequency count in the
302+
CURRENT data of that value for this attribute.
303+
"""
284304
# Retrieve statistics on current value from cell_domain
285-
286305
df_domain = self.get_aux_table(AuxTables.cell_domain).df
287306
df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts()
288307
# We do not store attributes with only NULL values in cell_domain:
289308
# we require _nan_ in our single stats however
290309
if df_count.empty:
291-
return pd.Series(self.total_tuples, index=['_nan_'])
292-
return df_count
293-
>>>>>>> Re-compute single and co-occur stats after every EM iteration.
310+
return {'_nan_': self.total_tuples}
311+
return df_count.to_dict()
294312

295-
<<<<<<< HEAD
296-
def get_stats_pair(self, first_attr, second_attr):
297-
=======
298-
def _get_stats_pair(self, cond_attr, trg_attr):
299-
>>>>>>> Cleaned up some private functions and accesses to aux_tables.
313+
def _get_init_stats_pair(self, first_attr, second_attr):
300314
"""
301-
Returns a dictionary {first_val -> {second_val -> count } } where:
315+
_get_init_stats_pair returns a dictionary {first_val -> {second_val ->
316+
count } } where (based on RAW/INITIAL dataset):
302317
<first_val>: all possible values for first_attr
303318
<second_val>: all values for second_attr that appeared at least once with <first_val>
304319
<count>: frequency (# of entities) where first_attr: <first_val> AND second_attr: <second_val>
305320
"""
306-
<<<<<<< HEAD
307-
tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count")
308-
return _dictify(tmp_df)
309-
=======
310-
# If cell_domain has not been initialized yet, retrieve statistics
311-
# from raw data (this happens when the domain is just being setup)
312-
if not self.aux_table_exists(AuxTables.cell_domain):
313-
return self.get_raw_data()[[cond_attr,trg_attr]].groupby([cond_attr,trg_attr]).size().reset_index(name="count")
314321

322+
# We need to iterate through this in a for loop instead of groupby & size
323+
# since our values may be '|||' separated
324+
cooccur_freq_count = {}
325+
for vals1, vals2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False):
326+
for val1 in vals1.split('|||'):
327+
cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {})
328+
for val2 in vals2.split('|||'):
329+
cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1
330+
return cooccur_freq_count
331+
332+
def _get_current_stats_pair(self, first_attr, second_attr):
333+
"""
334+
_get_current_stats_pair returns a dictionary {first_val -> {second_val ->
335+
count } } where (based on CURRENT dataset):
336+
<first_val>: all possible values for first_attr
337+
<second_val>: all values for second_attr that appeared at least once with <first_val>
338+
<count>: frequency (# of entities) where first_attr: <first_val> AND second_attr: <second_val>
339+
"""
315340
# Retrieve pairwise statistics on current value from cell_domain
316-
317341
df_domain = self.get_aux_table(AuxTables.cell_domain).df
318342
# Filter cell_domain for only the attributes we care about
319-
df_domain = df_domain[df_domain['attribute'].isin([cond_attr, trg_attr])]
320-
# Convert to wide form so we have our :param cond_attr:
321-
# and :trg_attr: as columns along with the _tid_ column
343+
df_domain = df_domain[df_domain['attribute'].isin([first_attr, second_attr])]
344+
# Convert to wide form so we have our :param first_attr:
345+
# and :second_attr: as columns along with the _tid_ column
322346
df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value')
323347
# We do not store cells for attributes consisting of only NULL values in cell_domain.
324348
# We require this for pair stats though.
325-
if cond_attr not in df_domain.columns:
326-
df_domain[cond_attr] = '_nan_'
327-
if trg_attr not in df_domain.columns:
328-
df_domain[trg_attr] = '_nan_'
329-
return df_domain.groupby([cond_attr, trg_attr]).size().reset_index(name="count")
330-
>>>>>>> Re-compute single and co-occur stats after every EM iteration.
349+
if first_attr not in df_domain.columns:
350+
df_domain[first_attr] = '_nan_'
351+
if second_attr not in df_domain.columns:
352+
df_domain[second_attr] = '_nan_'
353+
return _dictify(df_domain.groupby([first_attr, second_attr]).size().reset_index(name="count"))
331354

332355
def get_domain_info(self):
333356
"""
@@ -366,18 +389,13 @@ def generate_inferred_values(self):
366389

367390
def generate_repaired_dataset(self):
368391
tic = time.clock()
369-
init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
370-
<<<<<<< HEAD
371-
t = self.aux_table[AuxTables.inf_values_dom]
372-
repaired_vals = _dictify(t.df.reset_index())
373-
=======
392+
records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
374393
t = self.aux_tables[AuxTables.inf_values_dom]
375-
repaired_vals = dictify(t.df.reset_index())
376-
>>>>>>> Cleaned up some private functions and accesses to aux_tables.
394+
repaired_vals = _dictify(t.df.reset_index())
377395
for tid in repaired_vals:
378396
for attr in repaired_vals[tid]:
379-
init_records[tid][attr] = repaired_vals[tid][attr]
380-
repaired_df = pd.DataFrame.from_records(init_records)
397+
records[tid][attr] = repaired_vals[tid][attr]
398+
repaired_df = pd.DataFrame.from_records(records)
381399
name = self.raw_data.name+'_repaired'
382400
self.repaired_data = Table(name, Source.DF, df=repaired_df)
383401
self.repaired_data.store_to_db(self.engine.engine)

dataset/table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
from enum import Enum
12
import os
23
import pandas as pd
3-
from enum import Enum
44

55
class Source(Enum):
66
FILE = 1

detect/nulldetector.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def detect_noisy_cells(self):
2222
attributes = self.ds.get_attributes()
2323
errors = []
2424
for attr in attributes:
25-
tmp_df = self.df[self.df[attr].isnull()]['_tid_'].to_frame()
25+
# self.df i.e. raw_data has all NULL values converted to '_nan_'
26+
tmp_df = self.df[self.df[attr] == '_nan_']['_tid_'].to_frame()
2627
tmp_df.insert(1, "attribute", attr)
2728
errors.append(tmp_df)
2829
errors_df = pd.concat(errors, ignore_index=True)

detect/violationdetector.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,33 @@ def setup(self, dataset, env):
1717
self.constraints = dataset.constraints
1818

1919
def detect_noisy_cells(self):
20-
# Convert Constraints to SQL queries
20+
"""
21+
detect_noisy_cells returns all cells that are involved in a DC violation.
22+
23+
:return: pandas.DataFrame with two columns:
24+
'_tid_': TID of tuple
25+
'attribute': attribute corresponding to cell involved in DC violation
26+
"""
27+
28+
# Convert Constraints to SQL queries
29+
2130
tbl = self.ds.raw_data.name
2231
queries = []
32+
# attributes involved in a DC violation (indexed by corresponding query)
2333
attrs = []
24-
for c_key in self.constraints:
25-
c = self.constraints[c_key]
26-
q = self.to_sql(tbl, c)
34+
for constraint in self.constraints.values():
35+
# SQL query to query for TIDs involved in a DC violation for this constraint
36+
q = self.to_sql(tbl, constraint)
2737
queries.append(q)
28-
attrs.append(c.components)
38+
attrs.append(constraint.components)
2939
# Execute Queries over the DBEngine of Dataset
3040
results = self.ds.engine.execute_queries(queries)
3141

3242
# Generate final output
3343
errors = []
34-
for i in range(len(attrs)):
35-
res = results[i]
36-
attr_list = attrs[i]
37-
tmp_df = self.gen_tid_attr_output(res, attr_list)
44+
for attr_list, res in zip(attrs, results):
45+
# DataFrame with TID and attribute pairs from DC violation queries
46+
tmp_df = self._gen_tid_attr_output(res, attr_list)
3847
errors.append(tmp_df)
3948
errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True)
4049
return errors_df
@@ -79,7 +88,12 @@ def gen_mult_query(self, tbl, c):
7988
query = mult_template.substitute(table=tbl, cond1=cond1, c='', cond2=cond2)
8089
return query
8190

82-
def gen_tid_attr_output(self, res, attr_list):
91+
def _gen_tid_attr_output(self, res, attr_list):
92+
"""
93+
_gen_tid_attr_output creates a DataFrame containing the TIDs from
94+
the DC violation query results in :param res: with the attributes
95+
that were involved in the violation in :param attr_list:.
96+
"""
8397
errors = []
8498
for tuple in res:
8599
tid = int(tuple[0])

0 commit comments

Comments
 (0)