-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathcsv_writer.py
86 lines (74 loc) · 2.58 KB
/
csv_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import json
import logging
from twarc.decorators2 import FileSizeProgressBar
from more_itertools import ichunked
import dataframe_converter
log = logging.getLogger("twarc")
class CSVConverter:
"""
JSON Reader and CSV Writer. Converts a given file into CSV, splitting it into chunks, showing progress.
"""
def __init__(
self,
infile,
outfile,
converter=dataframe_converter.DataFrameConverter(),
output_format="csv",
batch_size=100,
hide_progress=False,
):
self.infile = infile
self.outfile = outfile
self.converter = converter
self.output_format = output_format
self.batch_size = batch_size
self.hide_progress = hide_progress
self.progress = FileSizeProgressBar(
infile, outfile, disable=(hide_progress or not self.infile.seekable())
)
def _read_lines(self):
"""
Generator for reading files line by line from a file. Progress bar is based on file size.
"""
line = self.infile.readline()
while line:
self.converter.counts["lines"] += 1
if line.strip() != "":
try:
o = json.loads(line)
yield o
except Exception as ex:
self.converter.counts["parse_errors"] += 1
log.error(f"Error when trying to parse json: '{line}' {ex}")
if not self.hide_progress and self.infile.seekable():
self.progress.update(self.infile.tell() - self.progress.n)
line = self.infile.readline()
def _write_output(self, _df, first_batch):
"""
Write out the dataframe chunk by chunk
todo: take parameters from commandline for optional output formats.
"""
if first_batch:
mode = "w"
header = True
else:
mode = "a+"
header = False
self.converter.counts["rows"] += len(_df)
_df.to_csv(
self.outfile,
mode=mode,
columns=self.converter.output_columns,
index=False,
header=header,
) # todo: (Optional) arguments for to_csv
def process(self):
"""
Process a file containing JSON into a CSV
"""
# Flag for writing header & appending to CSV file
first_batch = True
for batch in ichunked(self._read_lines(), self.batch_size):
self._write_output(self.converter.process(batch), first_batch)
first_batch = False
self.progress.close()