Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import psycopg2
import json
import decimal
import re

from elasticsearch import Elasticsearch
from elasticsearch import helpers
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self, PostGISConnection, ESConnection, view, sqlquerystring):
self._pgConnection = PostGISConnection
self._esConnection = ESConnection
self._view = view
self._sqlquerystring = sqlquerystring
self._sqlquerystring = re.sub(r'\s{2,}', ' ', sqlquerystring)
self._auth = get_config_params('config.ini')

def pgConnection(self):
Expand All @@ -79,12 +80,8 @@ def auth(self):
def sqlquerystring(self):
return self._sqlquerystring

def getGeoJson(self, sqlquerystring, pgConnection):
cur = pgConnection.pgConnection().cursor()
cur.execute(sqlquerystring)
rows = cur.fetchall()
def getGeoJson(self, rows, columns):
if rows:
columns = [name[0] for name in cur.description]
geomIndex = columns.index('st_asgeojson')
feature_collection = {'type': 'FeatureCollection',
'features': []}
Expand All @@ -101,6 +98,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
value = row[index]
feature['properties'][column] = value
feature_collection['features'].append(feature)

geojsonobject = json.dumps(feature_collection,
indent=2,
default=decimal_default)
Expand Down Expand Up @@ -142,33 +140,41 @@ def postgis2es(self):
sqlquerystring = self.sqlquerystring().format(
**{'limit': self.LIMIT,
'offset': self.OFFSET})
geojsonobject = self.getGeoJson(sqlquerystring, self.pgConnection())
while geojsonobject is not None:

print(sqlquerystring)
self.populateElasticSearchIndex(self.esConnection(),
geojsonobject,
self.auth(),
self.view())
self.OFFSET += self.LIMIT

sqlquerystring = self.sqlquerystring().format(
**{'limit': self.LIMIT,
'offset': self.OFFSET})
geojsonobject = self.getGeoJson(sqlquerystring,
self.pgConnection())

# Remove LIMIT and OFFSET until we decide to change all caller scripts
sqlquerystring = re.sub(r'\s+LIMIT.*', '', sqlquerystring)

print(sqlquerystring)

with self.pgConnection().pgConnection() as conn:
with conn.cursor(name='postgis2es_cursor') as cur:
cur.itersize = self.LIMIT
cur.execute(sqlquerystring)
rows = cur.fetchmany(self.LIMIT)
columns = [name[0] for name in cur.description]

count = 0
while rows:
count = count + 1
print("Rows %d-%d, %s = %s" %
(self.LIMIT * (count - 1) + 1, self.LIMIT * count,
columns[0], rows[0][0]))

geojsonobject = self.getGeoJson(rows, columns)
# print("populateElasticsearchIndex()")
self.populateElasticSearchIndex(self.esConnection(),
geojsonobject,
self.auth(),
self.view())
rows = cur.fetchmany(self.LIMIT)

return


class PostGISPointDataset(PostGISdataset):

def getGeoJson(self, sqlquerystring, pgConnection):
cur = pgConnection.pgConnection().cursor()
cur.execute(sqlquerystring)
rows = cur.fetchall()
def getGeoJson(self, rows, columns):
if rows:
columns = [name[0] for name in cur.description]
geomIndex = columns.index('st_asgeojson')
feature_collection = {'type': 'FeatureCollection',
'features': []}
Expand All @@ -187,6 +193,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
value = row[index]
feature['properties'][column] = value
feature_collection['features'].append(feature)

geojsonobject = json.dumps(feature_collection,
indent=2,
default=decimal_default)
Expand All @@ -197,12 +204,8 @@ def getGeoJson(self, sqlquerystring, pgConnection):

class PostGISTable(PostGISdataset):

def getGeoJson(self, sqlquerystring, pgConnection):
cur = pgConnection.pgConnection().cursor()
cur.execute(sqlquerystring)
rows = cur.fetchall()
def getGeoJson(self, rows, columns):
if rows:
columns = [name[0] for name in cur.description]
# geomIndex = columns.index('st_asgeojson')
feature_collection = {'type': 'FeatureCollection',
'features': []}
Expand All @@ -221,6 +224,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
value = row[index]
feature['properties'][column] = value
feature_collection['features'].append(feature)

geojsonobject = json.dumps(feature_collection,
indent=2,
default=decimal_default)
Expand Down