Decipher RasterΒΆ

The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.

 10 import binascii
 11 import struct
 12
 13 import pytest
 14 from sqlalchemy import Column
 15 from sqlalchemy import Integer
 16 from sqlalchemy import MetaData
 17 from sqlalchemy.ext.declarative import declarative_base
 18
 19 from geoalchemy2 import Raster
 20 from geoalchemy2 import WKTElement
 21
 22 # Tests imports
 23 from tests import test_only_with_dialects
 24
 25 metadata = MetaData()
 26 Base = declarative_base(metadata=metadata)
 27
 28
 29 class Ocean(Base):
 30     __tablename__ = 'ocean'
 31     id = Column(Integer, primary_key=True)
 32     rast = Column(Raster)
 33
 34     def __init__(self, rast):
 35         self.rast = rast
 36
 37
 38 def _format_e(endianness, struct_format):
 39     return _ENDIANNESS[endianness] + struct_format
 40
 41
 42 def wkbHeader(raw):
 43     # Function to decipher the WKB header
 44     # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
 45
 46     header = {}
 47
 48     header['endianness'] = struct.unpack('b', raw[0:1])[0]
 49
 50     e = header['endianness']
 51     header['version'] = struct.unpack(_format_e(e, 'H'), raw[1:3])[0]
 52     header['nbands'] = struct.unpack(_format_e(e, 'H'), raw[3:5])[0]
 53     header['scaleX'] = struct.unpack(_format_e(e, 'd'), raw[5:13])[0]
 54     header['scaleY'] = struct.unpack(_format_e(e, 'd'), raw[13:21])[0]
 55     header['ipX'] = struct.unpack(_format_e(e, 'd'), raw[21:29])[0]
 56     header['ipY'] = struct.unpack(_format_e(e, 'd'), raw[29:37])[0]
 57     header['skewX'] = struct.unpack(_format_e(e, 'd'), raw[37:45])[0]
 58     header['skewY'] = struct.unpack(_format_e(e, 'd'), raw[45:53])[0]
 59     header['srid'] = struct.unpack(_format_e(e, 'i'), raw[53:57])[0]
 60     header['width'] = struct.unpack(_format_e(e, 'H'), raw[57:59])[0]
 61     header['height'] = struct.unpack(_format_e(e, 'H'), raw[59:61])[0]
 62
 63     return header
 64
 65
 66 def read_band(data, offset, pixtype, height, width, endianness=1):
 67     ptype, _, psize = _PTYPE[pixtype]
 68     pix_data = data[offset + 1: offset + 1 + width * height * psize]
 69     band = [
 70         [
 71             struct.unpack(_format_e(endianness, ptype), pix_data[
 72                 (i * width + j) * psize: (i * width + j + 1) * psize
 73             ])[0]
 74             for j in range(width)
 75         ]
 76         for i in range(height)
 77     ]
 78     return band
 79
 80
 81 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
 82     import numpy as np  # noqa
 83     _, dtype, psize = _PTYPE[pixtype]
 84     dt = np.dtype(dtype)
 85     dt = dt.newbyteorder(_ENDIANNESS[endianness])
 86     band = np.frombuffer(data, dtype=dtype,
 87                          count=height * width, offset=offset + 1)
 88     band = (np.reshape(band, ((height, width))))
 89     return band
 90
 91
 92 _PTYPE = {
 93     0: ['?', '?', 1],
 94     1: ['B', 'B', 1],
 95     2: ['B', 'B', 1],
 96     3: ['b', 'b', 1],
 97     4: ['B', 'B', 1],
 98     5: ['h', 'i2', 2],
 99     6: ['H', 'u2', 2],
100     7: ['i', 'i4', 4],
101     8: ['I', 'u4', 4],
102     10: ['f', 'f4', 4],
103     11: ['d', 'f8', 8],
104 }
105
106 _ENDIANNESS = {
107     0: '>',
108     1: '<',
109 }
110
111
112 def wkbImage(raster_data, use_numpy=False):
113     """Function to decipher the WKB raster data"""
114
115     # Get binary data
116     raw = binascii.unhexlify(raster_data)
117
118     # Read header
119     h = wkbHeader(bytes(raw))
120     e = h["endianness"]
121
122     img = []  # array to store image bands
123     offset = 61  # header raw length in bytes
124     band_size = h['width'] * h['height']  # number of pixels in each band
125
126     for i in range(h['nbands']):
127         # Determine pixtype for this band
128         pixtype = struct.unpack(_format_e(e, 'b'), raw[offset: offset + 1])[0] - 64
129
130         # Read data with either pure Python or Numpy
131         if use_numpy:
132             band = read_band_numpy(
133                 raw, offset, pixtype, h['height'], h['width'])
134         else:
135             band = read_band(
136                 raw, offset, pixtype, h['height'], h['width'])
137
138         # Store the result
139         img.append(band)
140         offset = offset + 2 + band_size
141
142     return img
143
144
145 @test_only_with_dialects("postgresql")
146 class TestDecipherRaster():
147
148     @pytest.mark.parametrize("pixel_type", [
149         '1BB',
150         '2BUI',
151         '4BUI',
152         '8BSI',
153         '8BUI',
154         '16BSI',
155         '16BUI',
156         '32BSI',
157         '32BUI',
158         '32BF',
159         '64BF'
160     ])
161     def test_decipher_raster(self, pixel_type, session, conn):
162         """Create a raster and decipher it"""
163         metadata.drop_all(conn, checkfirst=True)
164         metadata.create_all(conn)
165
166         # Create a new raster
167         polygon = WKTElement('POLYGON((0 0,1 1,0 1,0 0))', srid=4326)
168         o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
169         session.add(o)
170         session.flush()
171
172         # Decipher data from each raster
173         image = wkbImage(o.rast.data)
174
175         # Define expected result
176         expected = [
177             [0, 1, 1, 1, 1],
178             [1, 1, 1, 1, 1],
179             [0, 1, 1, 1, 0],
180             [0, 1, 1, 0, 0],
181             [0, 1, 0, 0, 0],
182             [0, 0, 0, 0, 0]
183         ]
184
185         # Check results
186         band = image[0]
187         assert band == expected

Gallery generated by Sphinx-Gallery