-
Notifications
You must be signed in to change notification settings - Fork 83
Expand file tree
/
Copy pathgrobid-client.py
More file actions
208 lines (175 loc) · 9.8 KB
/
grobid-client.py
File metadata and controls
208 lines (175 loc) · 9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import sys
import os
import io
import json
import argparse
import time
import concurrent.futures
from client import ApiClient
import ntpath
import requests
'''
This version uses the standard ProcessPoolExecutor for parallelizing the concurrent calls to the GROBID services.
Given the limits of ThreadPoolExecutor (input stored in memory, blocking Executor.map until the whole input
is acquired), it works with batches of PDF of a size indicated in the config.json file (default is 1000 entries).
We are moving from first batch to the second one only when the first is entirely processed - which means it is
slightly sub-optimal, but should scale better. However acquiring a list of million of files in directories would
require something scalable too, which is not implemented for the moment.
'''
class grobid_client(ApiClient):
def __init__(self, config_path='./config.json'):
self.config = None
self._load_config(config_path)
self.cache = []
def _load_config(self, path='./config.json'):
"""
Load the json configuration
"""
config_json = open(path).read()
self.config = json.loads(config_json)
# test if the server is up and running...
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/isalive"
r = requests.get(the_url)
status = r.status_code
if status != 200:
print('GROBID server does not appear up and running ' + str(status))
else:
print("GROBID server is up and running")
def process(self, input, output, n, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download):
batch_size_pdf = self.config['batch_size']
pdf_files = []
for (dirpath, dirnames, filenames) in os.walk(input):
for filename in filenames:
if filename.endswith('.pdf') or filename.endswith('.PDF'):
pdf_files.append(os.sep.join([dirpath, filename]))
if len(pdf_files) == batch_size_pdf:
self.process_batch(pdf_files, output, n, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download)
pdf_files = []
# last batch
if len(pdf_files) > 0:
self.process_batch(pdf_files, output, n, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download)
def factory_wrapper(self, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download):
return lambda item: self.process_pdf(item, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download)
def process_batch(self, pdf_files, output, n, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download):
print(len(pdf_files), "PDF files to process")
with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor:
futures = []
for pdf_file in pdf_files:
futures.append(executor.submit(self.process_pdf, pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download))
for future in concurrent.futures.as_completed(futures):
self.cache.append(future.result())
def process_pdf(self, pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download):
# check if TEI file is already produced
# we use ntpath here to be sure it will work on Windows too
pdf_file_name = ntpath.basename(pdf_file)
if output is not None:
filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(pdf_file), os.path.splitext(pdf_file_name)[0] + '.tei.xml')
if not force and os.path.isfile(filename):
print(filename, "already exist, skipping... (use --force to reprocess pdf input files)")
return
print(pdf_file)
files = {
'input': (
pdf_file,
open(pdf_file, 'rb'),
'application/pdf',
{'Expires': '0'}
)
}
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/"+service
# set the GROBID parameters
the_data = {}
if generateIDs:
the_data['generateIDs'] = '1'
if consolidate_header:
the_data['consolidateHeader'] = '1'
if consolidate_citations:
the_data['consolidateCitations'] = '1'
if include_raw_citations:
the_data['includeRawCitations'] = '1'
if include_raw_affiliations:
the_data['includeRawAffiliations'] = '1'
if teiCoordinates:
the_data['teiCoordinates'] = self.config['coordinates']
if download:
the_data['download'] = '1'
res, status = self.post(
url=the_url,
files=files,
data=the_data,
headers={'Accept': 'text/plain'}
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_pdf(pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download)
elif status != 200:
print('Processing failed with error ' + str(status))
else:
if download:
# writing TEI file
try:
with io.open(filename,'w',encoding='utf8') as tei_file:
tei_file.write(res.text)
except OSError:
print ("Writing resulting TEI XML file %s failed" % filename)
pass
else:
print("Saving to cache")
return (pdf_file_name, pdf_file, res.text)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description = "Client for GROBID services")
parser.add_argument("service", help="one of [processFulltextDocument, processHeaderDocument, processReferences]")
parser.add_argument("--input", default=None, help="path to the directory containing PDF to process")
parser.add_argument("--output", default=None, help="path to the directory where to put the results (optional)")
parser.add_argument("--config", default="./config.json", help="path to the config file, default is ./config.json")
parser.add_argument("--n", default=10, help="concurrency for service usage")
parser.add_argument("--generateIDs", action='store_true', help="generate random xml:id to textual XML elements of the result files")
parser.add_argument("--consolidate_header", action='store_true', help="call GROBID with consolidation of the metadata extracted from the header")
parser.add_argument("--consolidate_citations", action='store_true', help="call GROBID with consolidation of the extracted bibliographical references")
parser.add_argument("--include_raw_citations", action='store_true', help="call GROBID requesting the extraction of raw citations")
parser.add_argument("--include_raw_affiliations", action='store_true', help="call GROBID requestiong the extraciton of raw affiliations")
parser.add_argument("--force", action='store_true', help="force re-processing pdf input files when tei output files already exist")
parser.add_argument("--teiCoordinates", action='store_true', help="add the original PDF coordinates (bounding boxes) to the extracted elements")
parser.add_argument("--download", action='store_true', help="1 to download the XML files, 0 to return them locally")
args = parser.parse_args()
input_path = args.input
config_path = args.config
output_path = args.output
n =10
if args.n is not None:
try:
n = int(args.n)
except ValueError:
print("Invalid concurrency parameter n:", n, "n = 10 will be used by default")
pass
# if output path does not exist, we create it
if output_path is not None and not os.path.isdir(output_path):
try:
print("output directory does not exist but will be created:", output_path)
os.makedirs(output_path)
except OSError:
print ("Creation of the directory %s failed" % output_path)
else:
print ("Successfully created the directory %s" % output_path)
service = args.service
generateIDs = args.generateIDs
consolidate_header = args.consolidate_header
consolidate_citations = args.consolidate_citations
include_raw_citations = args.include_raw_citations
include_raw_affiliations = args.include_raw_affiliations
force = args.force
teiCoordinates = args.teiCoordinates
download = args.download
client = grobid_client(config_path=config_path)
start_time = time.time()
client.process(input_path, output_path, n, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates, download)
runtime = round(time.time() - start_time, 3)
print("runtime: %s seconds " % (runtime))