Process, transform, analyze, and report on CSV and JSON data files. Use when the user needs to filter rows, join datasets, compute aggregates, convert formats, deduplicate, or generate summary reports from tabular data. Works with any CSV, TSV, or JSON Lines file.
Install
Documentation
CSV Data Pipeline
Process tabular data (CSV, TSV, JSON, JSON Lines) using standard command-line tools and Python. No external dependencies required beyond Python 3.
When to Use
- -User provides a CSV/TSV/JSON file and asks to analyze, transform, or report on it
- -Joining, filtering, grouping, or aggregating tabular data
- -Converting between formats (CSV to JSON, JSON to CSV, etc.)
- -Deduplicating, sorting, or cleaning messy data
- -Generating summary statistics or reports
- -ETL workflows: extract from one format, transform, load into another
Quick Operations with Standard Tools
Inspect
Preview first rows
head -5 data.csv
Count rows (excluding header)
tail -n +2 data.csv | wc -l
Show column headers
head -1 data.csv
Count unique values in a column (column 3)
tail -n +2 data.csv | cut -d',' -f3 | sort -u | wc -l
Filter with awk
Filter rows where column 3 > 100
awk -F',' 'NR==1 || $3 > 100' data.csv > filtered.csv
Filter rows matching a pattern in column 2
awk -F',' 'NR==1 || $2 ~ /pattern/' data.csv > matched.csv
Sum column 4
awk -F',' 'NR>1 {sum += $4} END {print sum}' data.csv
Sort and Deduplicate
Sort by column 2 (numeric)
head -1 data.csv > sorted.csv && tail -n +2 data.csv | sort -t',' -k2 -n >> sorted.csv
Deduplicate by all columns
head -1 data.csv > deduped.csv && tail -n +2 data.csv | sort -u >> deduped.csv
Deduplicate by specific column (keep first occurrence)
awk -F',' '!seen[$2]++' data.csv > deduped.csv
Python Operations (for complex transforms)
Read and Inspect
import csv, json, sys
from collections import Counter
def read_csv(path, delimiter=','):
"""Read CSV/TSV into list of dicts."""
with open(path, newline='', encoding='utf-8') as f:
return list(csv.DictReader(f, delimiter=delimiter))
def write_csv(rows, path, delimiter=','):
"""Write list of dicts to CSV."""
if not rows:
return
with open(path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys(), delimiter=delimiter)
writer.writeheader()
writer.writerows(rows)
Quick stats
data = read_csv('data.csv')
print(f"Rows: {len(data)}")
print(f"Columns: {list(data[0].keys())}")
for col in data[0]:
non_empty = sum(1 for r in data if r[col].strip())
print(f" {col}: {non_empty}/{len(data)} non-empty")
Filter and Transform
Filter rows
filtered = [r for r in data if float(r['amount']) > 100]
Add computed column
for r in data:
r['total'] = str(float(r['price']) * int(r['quantity']))
Rename columns
renamed = [{('new_name' if k == 'old_name' else k): v for k, v in r.items()} for r in data]
Type conversion
for r in data:
r['amount'] = float(r['amount'])
r['date'] = r['date'].strip()
Group and Aggregate
from collections import defaultdict
def group_by(rows, key):
"""Group rows by a column value."""
groups = defaultdict(list)
for r in rows:
groups[r[key]].append(r)
return dict(groups)
def aggregate(rows, group_col, agg_col, func='sum'):
"""Aggregate a column by groups."""
groups = group_by(rows, group_col)
results = []
for name, group in sorted(groups.items()):
values = [float(r[agg_col]) for r in group if r[agg_col].strip()]
if func == 'sum':
agg = sum(values)
elif func == 'avg':
agg = sum(values) / len(values) if values else 0
elif func == 'count':
agg = len(values)
elif func == 'min':
agg = min(values) if values else 0
elif func == 'max':
agg = max(values) if values else 0
results.append({group_col: name, f'{func}_{agg_col}': str(agg), 'count': str(len(group))})
return results
Example: sum revenue by category
summary = aggregate(data, 'category', 'revenue', 'sum')
write_csv(summary, 'summary.csv')
Join Datasets
def inner_join(left, right, on):
"""Inner join two datasets on a key column."""
right_index = {}
for r in right:
key = r[on]
if key not in right_index:
right_index[key] = []
right_index[key].append(r)
results = []
for lr in left:
key = lr[on]
if key in right_index:
for rr in right_index[key]:
merged = {**lr}
for k, v in rr.items():
if k != on:
merged[k] = v
results.append(merged)
return results
def left_join(left, right, on):
"""Left join: keep all left rows, fill missing right with empty."""
right_index = {}
right_cols = set()
for r in right:
key = r[on]
right_cols.update(r.keys())
if key not in right_index:
right_index[key] = []
right_index[key].append(r)
right_cols.discard(on)
results = []
for lr in left:
key = lr[on]
if key in right_index:
for rr in right_index[key]:
merged = {**lr}
for k, v in rr.items():
if k != on:
merged[k] = v
results.append(merged)
else:
merged = {**lr}
for col in right_cols:
merged[col] = ''
results.append(merged)
return results
Example
orders = read_csv('orders.csv')
customers = read_csv('customers.csv')
joined = left_join(orders, customers, on='customer_id')
write_csv(joined, 'orders_with_customers.csv')
Deduplicate
def deduplicate(rows, key_cols=None):
"""Remove duplicate rows. If key_cols specified, dedupe by those columns only."""
seen = set()
unique = []
for r in rows:
if key_cols:
key = tuple(r[c] for c in key_cols)
else:
key = tuple(sorted(r.items()))
if key not in seen:
seen.add(key)
unique.append(r)
return unique
Deduplicate by email column
clean = deduplicate(data, key_cols=['email'])
Format Conversion
CSV to JSON
import json, csv
with open('data.csv', newline='', encoding='utf-8') as f:
rows = list(csv.DictReader(f))
Array of objects
with open('data.json', 'w') as f:
json.dump(rows, f, indent=2)
JSON Lines (one object per line, streamable)
with open('data.jsonl', 'w') as f:
for row in rows:
f.write(json.dumps(row) + '\n')
JSON to CSV
import json, csv
with open('data.json') as f:
rows = json.load(f)
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
JSON Lines to CSV
import json, csv
rows = []
with open('data.jsonl') as f:
for line in f:
if line.strip():
rows.append(json.loads(line))
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
all_keys = set()
for r in rows:
all_keys.update(r.keys())
writer = csv.DictWriter(f, fieldnames=sorted(all_keys))
writer.writeheader()
writer.writerows(rows)
TSV to CSV
tr '\t' ',' < data.tsv > data.csv
Data Cleaning Patterns
Fix common CSV issues
def clean_csv(rows):
"""Clean common CSV data quality issues."""
cleaned = []
for r in rows:
clean_row = {}
for k, v in r.items():
# Strip whitespace from keys and values
k = k.strip()
v = v.strip() if isinstance(v, str) else v
# Normalize empty values
if v in ('', 'N/A', 'n/a', 'NA', 'null', 'NULL', 'None', '-'):
v = ''
# Normalize boolean values
if v.lower() in ('true', 'yes', '1', 'y'):
v = 'true'
elif v.lower() in ('false', 'no', '0', 'n'):
v = 'false'
clean_row[k] = v
cleaned.append(clean_row)
return cleaned
Validate data types
def validate_rows(rows, schema):
"""
Validate rows against a schema.
schema: dict of column_name -> 'int'|'float'|'date'|'email'|'str'
Returns (valid_rows, error_rows)
"""
import re
valid, errors = [], []
for i, r in enumerate(rows):
errs = []
for col, dtype in schema.items():
val = r.get(col, '').strip()
if not val:
continue
if dtype == 'int':
try:
int(val)
except ValueError:
errs.append(f"{col}: '{val}' not int")
elif dtype == 'float':
try:
float(val)
except ValueError:
errs.append(f"{col}: '{val}' not float")
elif dtype == 'email':
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', val):
errs.append(f"{col}: '{val}' not email")
elif dtype == 'date':
if not re.match(r'^\d{4}-\d{2}-\d{2}', val):
errs.append(f"{col}: '{val}' not YYYY-MM-DD")
if errs:
errors.append({'row': i + 2, 'errors': errs, 'data': r})
else:
valid.append(r)
return valid, errors
Usage
valid, bad = validate_rows(data, {'amount': 'float', 'email': 'email', 'date': 'date'})
print(f"Valid: {len(valid)}, Errors: {len(bad)}")
for e in bad[:5]:
print(f" Row {e['row']}: {e['errors']}")
Generating Reports
Summary report as Markdown
def generate_report(data, title, group_col, value_col):
"""Generate a Markdown summary report."""
lines = [f"# {title}", f"", f"Total rows: {len(data)}", ""]
# Group summary
groups = group_by(data, group_col)
lines.append(f"## By {group_col}")
lines.append("")
lines.append(f"| {group_col} | Count | Sum | Avg | Min | Max |")
lines.append("|---|---|---|---|---|---|")
for name in sorted(groups):
vals = [float(r[value_col]) for r in groups[name] if r[value_col].strip()]
if vals:
lines.append(f"| {name} | {len(vals)} | {sum(vals):.2f} | {sum(vals)/len(vals):.2f} | {min(vals):.2f} | {max(vals):.2f} |")
lines.append("")
lines.append(f"*Generated from {len(data)} rows*")
return '\n'.join(lines)
report = generate_report(data, "Sales Summary", "category", "revenue")
with open('report.md', 'w') as f:
f.write(report)
Large File Handling
For files too large to load into memory at once:
def stream_process(input_path, output_path, transform_fn, delimiter=','):
"""Process a CSV row-by-row without loading entire file."""
with open(input_path, newline='', encoding='utf-8') as fin, \
open(output_path, 'w', newline='', encoding='utf-8') as fout:
reader = csv.DictReader(fin, delimiter=delimiter)
writer = None
for row in reader:
result = transform_fn(row)
if result is None:
continue # Skip row
if writer is None:
writer = csv.DictWriter(fout, fieldnames=result.keys(), delimiter=delimiter)
writer.writeheader()
writer.writerow(result)
Example: filter and transform in streaming fashion
def process_row(row):
if float(row.get('amount', 0) or 0) < 10:
return None # Skip small amounts
row['amount_usd'] = str(float(row['amount']) * 1.0) # Add computed field
return row
stream_process('big_file.csv', 'output.csv', process_row)
Tips
- -Always check encoding:
file -i data.csvor open withencoding='utf-8-sig'for BOM files - -For Excel exports with commas in values, the CSV module handles quoting automatically
- -Use
json.dumps(ensure_ascii=False)for international characters - -Pipe-delimited files: use
delimiter='|'in csv.reader/writer - -For very large aggregations, consider
sqlite3which Python includes:
sqlite3 :memory: ".mode csv" ".import data.csv t" "SELECT category, SUM(amount) FROM t GROUP BY category;"
Launch an agent with CSV Data Pipeline on Termo.