Block-Based Streaming
19 Jan 2015This is it. My last blog on streaming, at least for a while. Looking back, I have been writing about the VTK pipeline and how it can be leveraged to do cool things since September. This will be a cool topic to wrap this series up with.
In the last blog, I demonstrated how to write a dataset in blocks using h5py and reading it with a simple reader. When writing the blocks, we also saved meta-data for the spatial bounds of each block. We will leverage this meta-data here. Here is the reader.
class HDF5Reader(VTKPythonAlgorithmBase):
def __init__(self):
VTKPythonAlgorithmBase.__init__(self,
nInputPorts=0,
nOutputPorts=1, outputType='vtkMultiBlockDataSet')
self.__FileName = ""
def RequestInformation(self, request, inInfo, outInfo):
import re
p = re.compile('piece([0-9]*)')
md = vtk.vtkMultiBlockDataSet()
f = h5py.File(self.__FileName, 'r')
for grp_name in f:
idx = int(p.match(grp_name).groups()[0])
md.SetBlock(idx, None)
bmd = md.GetMetaData(idx)
idx += 1
grp = f[grp_name]
bds = grp.attrs['bounds']
bmd.Set(vtk.vtkDataObject.BOUNDING_BOX(), bds, 6)
f.close()
outInfo.GetInformationObject(0).Set(
vtk.vtkCompositeDataPipeline.COMPOSITE_DATA_META_DATA(),
md)
return 1
def RequestData(self, request, inInfo, outInfo):
output = dsa.WrapDataObject(vtk.vtkMultiBlockDataSet.GetData(outInfo))
import re
p = re.compile('piece([0-9]*)')
f = h5py.File(self.__FileName, 'r')
if outInfo.GetInformationObject(0).Has(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES()):
uci = outInfo.GetInformationObject(0).Get(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES())
if uci is None:
pieces = []
else:
pieces = ["piece%d" % num for num in uci]
else:
pieces = f
idx = 0
nblocks = len(pieces)
output.SetNumberOfBlocks(nblocks)
idx = 0
for grp_name in pieces:
ug = vtk.vtkUnstructuredGrid()
output.SetBlock(idx, ug)
idx += 1
ug = dsa.WrapDataObject(ug)
grp = f[grp_name]
cells = grp['cells'][:]
locations = grp['cell_locations'][:]
types = grp['cell_types'][:]
ug.SetCells(types, locations, cells)
pts = grp['points'][:]
ug.Points = pts
pt_arrays = grp['point_data']
for pt_array in pt_arrays:
array = pt_arrays[pt_array][:]
ug.PointData.append(array, pt_array)
f.close()
return 1
def SetFileName(self, fname):
if fname != self.__FileName:
self.Modified()
self.__FileName = fname
def GetFileName(self):
return self.__FileName
The part that reads data is almost identical to the reader in the last blog so I won't cover that here. The interesting part is related to meta-data and request. Here is the meta-data part.
1 def RequestInformation(self, request, inInfo, outInfo):
2 import re
3 p = re.compile('piece([0-9]*)')
4
5 md = vtk.vtkMultiBlockDataSet()
6 f = h5py.File(self.__FileName, 'r')
7 for grp_name in f:
8 idx = int(p.match(grp_name).groups()[0])
9 md.SetBlock(idx, None)
10 bmd = md.GetMetaData(idx)
11 idx += 1
12 grp = f[grp_name]
13 bds = grp.attrs['bounds']
14 bmd.Set(vtk.vtkDataObject.BOUNDING_BOX(), bds, 6)
15
16 f.close()
17
18 outInfo.GetInformationObject(0).Set(
19 vtk.vtkCompositeDataPipeline.COMPOSITE_DATA_META_DATA(),
20 md)
21
22 return 1
Lines 5-14 are the meat of this function. On line 5, we create
a multi-block dataset that will be used to hold meta-data to
be sent downstream. Note that, this object is used for transmitting
meta-data only. On line 8, we extract
a block number from the name of an HDF5 group. On lines 12-13,
we read the bounds for that block. This meta-data is assigned to
the corresponding block's meta-data object obtained on line 10.
Finally, we set this dataset as the COMPOSITE_DATA_META_DATA()
on
lines 18-20. This information entry is propagated downstream automatically
by the pipeline during the RequestInformation()
pass.
Once this meta-data is propagated downstream, consumers can ask for
a subset of blocks to be read during the RequestUpdateExtent()
pass.
This pass will likely set a UPDATE_COMPOSITE_INDICES()
which the
reader has to respond to. We will demonstrate how this key is set
later. Let's first look at how the reader uses it in RequestData()
.
def RequestData(self, request, inInfo, outInfo):
output = dsa.WrapDataObject(vtk.vtkMultiBlockDataSet.GetData(outInfo))
import re
p = re.compile('piece([0-9]*)')
f = h5py.File(self.__FileName, 'r')
if outInfo.GetInformationObject(0).Has(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES()):
uci = outInfo.GetInformationObject(0).Get(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES())
if uci is None:
pieces = []
else:
pieces = ["piece%d" % num for num in uci]
else:
pieces = f
for grp_name in pieces:
# ...
The main difference from a standard reader is that we look at which
blocks (referred to as pieces
in this example but not to be confused
with the pipeline piece) are requested to decide what to read. This
code handles 3 different cases:
- No
UPDATE_COMPOSITE_INDICES()
is set. In this case, we read all of the blocks.pieces == f
. UPDATE_COMPOSITE_INDICES()
is set but is empty. We read nothing.pieces == []
UPDATE_COMPOSITE_INDICES()
is set to a non-empty list. We read what is requested.pieces == ["piece%d" % num for num in uci]
.
This is it for the reader. Now let's look at a simple streaming writer. This is very similar to writer in the previous blog but uses blocks instead of pieces to stream.
class StreamBlocks(VTKPythonAlgorithmBase):
def __init__(self):
VTKPythonAlgorithmBase.__init__(self,
nInputPorts=1, inputType='vtkMultiBlockDataSet',
nOutputPorts=1, outputType='vtkMultiBlockDataSet')
self.UpdateIndex = 0
self.__FileName = ""
def RequestInformation(self, request, inInfo, outInfo):
info = inInfo[0].GetInformationObject(0)
blocks = info.Get(
vtk.vtkCompositeDataPipeline.COMPOSITE_DATA_META_DATA())
self.NumberOfBlocks = blocks.GetNumberOfBlocks()
return 1
def RequestUpdateExtent(self, request, inInfo, outInfo):
info = inInfo[0].GetInformationObject(0)
info.Set(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES(),
[self.UpdateIndex],
1)
return 1
def RequestData(self, request, inInfo, outInfo):
if self.UpdateIndex == 0:
self.__File = h5py.File(self.__FileName, 'w')
info = inInfo[0].GetInformationObject(0)
inpMB = vtk.vtkMultiBlockDataSet.GetData(info)
if inpMB.GetBlock(0) is not None:
inp = dsa.WrapDataObject(inpMB.GetBlock(0))
if inp.GetNumberOfCells() > 0:
grp = self.__File.create_group("piece%d" % self.UpdateIndex)
grp.attrs['bounds'] = inp.GetBounds()
grp.create_dataset("cells", data=inp.Cells)
grp.create_dataset("cell_types", data=inp.CellTypes)
grp.create_dataset("cell_locations", data=inp.CellLocations)
grp.create_dataset("points", data=inp.Points)
pdata = grp.create_group("point_data")
for name in inp.PointData.keys():
pdata.create_dataset(name, data=inp.PointData[name])
if self.UpdateIndex < self.NumberOfBlocks - 1:
# If we are not done, ask the pipeline to re-execute us.
self.UpdateIndex += 1
request.Set(
vtk.vtkStreamingDemandDrivenPipeline.CONTINUE_EXECUTING(), 1)
else:
# Stop execution
request.Remove(
vtk.vtkStreamingDemandDrivenPipeline.CONTINUE_EXECUTING())
# Reset for next potential execution.
self.UpdateIndex = 0
self.__File.close()
del self.__File
return 1
def SetFileName(self, fname):
if fname != self.__FileName:
self.Modified()
self.__FileName = fname
def GetFileName(self):
return self.__FileName
If you compare this writer to the previous one, you will see that it has
very few minor differences. The biggest difference is in RequestUpdateExtent()
:
def RequestUpdateExtent(self, request, inInfo, outInfo):
info = inInfo[0].GetInformationObject(0)
info.Set(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES(),
[self.UpdateIndex],
1)
return 1
Note how the writer sets UPDATE_COMPOSITE_INDICES()
rather than
UPDATE_PIECE_NUMBER()
to achieve streaming.
Let's make things a bit more interesting. Say we want to insert a planar
cutter (slice) between the writer and the reader. This filter will need
only a subset of the input blocks - the ones that the slice plane intersects.
Since the spatial bounds of each block is available at the meta-data stage
(RequestInformation
), this filter can actually make a smart decision on
which blocks need to be loaded by the reader. Here is such a filter.
class PlaneCutter(VTKPythonAlgorithmBase):
def __init__(self):
VTKPythonAlgorithmBase.__init__(self,
nInputPorts=1, inputType='vtkMultiBlockDataSet',
nOutputPorts=1, outputType='vtkMultiBlockDataSet')
self.__Origin = (0, 0, 0)
self.__Normal = (1, 0, 0)
self.__BlockMap = {}
def CheckBounds(self, bounds):
pts = [(0, 0, 0), (0, 1, 0), (1, 0, 0), (0, 0, 1),
(1, 1, 0), (1, 0, 1), (0, 1, 1), (1, 1, 1)]
import numpy
org = numpy.array(self.__Origin, dtype = numpy.float64)
nrm = numpy.array(self.__Normal, dtype = numpy.float64)
point = numpy.zeros((3,), dtype = numpy.float64)
sign = None
all_same = True
for pt in pts:
point[0] = bounds[pt[0]]
point[1] = bounds[pt[1] + 2]
point[2] = bounds[pt[2] + 4]
sn = numpy.sign(numpy.dot(point - org, nrm))
if sign is None:
sign = sn
if sign != sn:
all_same = False
break
return not all_same
def RequestInformation(self, request, inInfo, outInfo):
inInfo0 = inInfo[0].GetInformationObject(0)
metaData = inInfo0.Get(
vtk.vtkCompositeDataPipeline.COMPOSITE_DATA_META_DATA())
nblocks = metaData.GetNumberOfBlocks()
idx = 0
for i in range(nblocks):
bmd = metaData.GetMetaData(i)
if self.CheckBounds(bmd.Get(vtk.vtkDataObject.BOUNDING_BOX())):
self.__BlockMap[idx] = i
idx += 1
md = vtk.vtkMultiBlockDataSet()
for i in self.__BlockMap.keys():
ii = self.__BlockMap[i]
imd = metaData.GetMetaData(ii)
bds = imd.Get(vtk.vtkDataObject.BOUNDING_BOX())
md.SetBlock(i, None)
bmd = md.GetMetaData(i)
bmd.Set(vtk.vtkDataObject.BOUNDING_BOX(), bds, 6)
outInfo.GetInformationObject(0).Set(
vtk.vtkCompositeDataPipeline.COMPOSITE_DATA_META_DATA(),
md)
return 1
def RequestUpdateExtent(self, request, inInfo, outInfo):
inInfo0 = inInfo[0].GetInformationObject(0)
if outInfo.GetInformationObject(0).Has(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES()):
uci = outInfo.GetInformationObject(0).Get(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES())
if uci is None:
uci = []
else:
uci = self.__BlockMap.keys()
requestBlocks = []
for i in uci:
requestBlocks.append(self.__BlockMap[i])
nblocks = len(requestBlocks)
if nblocks == 0:
# Special case because the Python generated code
# does not like None instead of a NULL pointer in
# this case.
inInfo0.Set(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES(),
[0],
0)
else:
inInfo0.Set(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES(),
requestBlocks, len(requestBlocks))
return 1
def RequestData(self, request, inInfo, outInfo):
inInfo0 = inInfo[0].GetInformationObject(0)
inpMB = vtk.vtkMultiBlockDataSet.GetData(inInfo0)
output = vtk.vtkMultiBlockDataSet.GetData(outInfo)
c = vtk.vtkCutter()
p = vtk.vtkPlane()
p.SetNormal(self.__Normal)
p.SetOrigin(self.__Origin)
c.SetCutFunction(p)
idx = 0
itr = inpMB.NewIterator()
while not itr.IsDoneWithTraversal():
inp = itr.GetCurrentDataObject()
c.SetInputData(inp)
c.Update()
opt = c.GetOutput().NewInstance()
opt.ShallowCopy(c.GetOutput())
output.SetBlock(idx, opt)
idx += 1
itr.GoToNextItem()
itr.UnRegister(None)
return 1
def SetOrigin(self, origin):
if origin != self.__Origin:
self.Modified()
self.__Origin = origin
def GetOrigin(self):
return self.__Origin
def SetNormal(self, normal):
if normal != self.__Normal:
self.Modified()
self.__Normal = normal
def GetNormal(self):
return self.__Normal
This filter is more complicated than what we have seen so far. Let's break it into smaller pieces to study.
First the meta-data pass.
Since this filter will ask only for a subset of the input blocks,
it needs to replace the COMPOSITE_DATA_META_DATA()
object with one
that takes out the blocks that will not be needed. This way, any
filter downstream will not ask for unnecessary blocks. The
following code identifies the blocks that may be loaded.
for i in range(nblocks):
bmd = metaData.GetMetaData(i)
if self.CheckBounds(bmd.Get(vtk.vtkDataObject.BOUNDING_BOX())):
self.__BlockMap[idx] = i
idx += 1
It also makes a map from the output block id to the input block id.
This will be needed in RequestUpdateExtent()
as we will see later.
Then we create a new multi-block meta-data object as follows.
md = vtk.vtkMultiBlockDataSet()
for i in self.__BlockMap.keys():
ii = self.__BlockMap[i]
imd = metaData.GetMetaData(ii)
bds = imd.Get(vtk.vtkDataObject.BOUNDING_BOX())
md.SetBlock(i, None)
bmd = md.GetMetaData(i)
bmd.Set(vtk.vtkDataObject.BOUNDING_BOX(), bds, 6)
outInfo.GetInformationObject(0).Set(
vtk.vtkCompositeDataPipeline.COMPOSITE_DATA_META_DATA(),
md)
Fairly straightforward. Using the map, copy meta-data from
input to output. Now the output meta-data contains only blocks
that intersect the plane. There is one minor issue however.
Downstream filters will use an index space to request blocks
which is different than the index space for the input. The
good news is that we have a map to convert from one to another.
We use this in RequestUpdateExtent()
as follows.
def RequestUpdateExtent(self, request, inInfo, outInfo):
# ...
requestBlocks = []
for i in uci:
requestBlocks.append(self.__BlockMap[i])
nblocks = len(requestBlocks)
inInfo0.Set(
vtk.vtkCompositeDataPipeline.UPDATE_COMPOSITE_INDICES(),
requestBlocks, len(requestBlocks))
Pretty easy. For each requested block (in uci
), ask for the
corresponding input block by mapping it through the BlockMap
data member. By the way, uci
is computed in a similar way
to the reader so it shouldn't need explanation.
Here you go. We can exercise these algorithms in the following ways.
r = HDF5Reader()
r.SetFileName("test.h5")
f = StreamBlocks()
f.SetInputConnection(r.GetOutputPort())
f.SetFileName("test2.h5")
f.Update()
This will produce a file identical to the input (test.h5
).
r = HDF5Reader()
r.SetFileName("test.h5")
c = PlaneCutter()
c.SetInputConnection(r.GetOutputPort())
c.SetOrigin((1, 0, 0))
c.Update()
print c.GetOutputDataObject(0)
For a test.h5
created with 33 blocks, this will print a dataset
with 7 blocks, showing that only 7 blocks were loaded. These are
blocks with bounds intersecting the plane.
The following pipeline, with minor modifications to the writer, would also work.
r = HDF5Reader()
r.SetFileName("test.h5")
c = PlaneCutter()
c.SetInputConnection(r.GetOutputPort())
c.SetOrigin((1, 0, 0))
f = StreamBlocks()
f.SetInputConnection(c.GetOutputPort())
f.SetFileName("test2.h5")
f.Update()
For this to work, you have to change StreamBlocks()
to
write a polydata instead of an unstructured grid (left to you
as an exercise). Once that change is made, this pipeline
will write 7 blocks of slices for a 33 blocks input dataset.
This is it folks. If you read all my blogs on the VTK pipeline, you pretty much know everything necessary to put together very sophisticated pipelines to solve all kinds of problems. Please be sure to let us know on the VTK mailing list about any interesting pipelines you put together.
In the future, I will switch gears and start talking about VTK's data model as well as various ways of developing parallel algorithms.