Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
A base class for simplified streaming data source readers.
Compared to DataSourceStreamReader, SimpleDataSourceStreamReader doesn't require planning data partitions. The read() method allows reading data and planning the latest offset at the same time.
Because SimpleDataSourceStreamReader reads records in the Spark driver to determine the end offset of each batch without partitioning, it is only suited for lightweight use cases where input rate and batch size are small. Use DataSourceStreamReader when read throughput is high and can't be handled by a single process.
Added in Databricks Runtime 15.3
Syntax
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| Method | Description |
|---|---|
initialOffset() |
Returns the initial offset of the streaming data source. A new streaming query starts reading from this offset. |
read(start) |
Reads all available data from the start offset and returns a tuple of an iterator of records and the end offset for the next read attempt. |
readBetweenOffsets(start, end) |
Reads all available data between specific start and end offsets. Invoked during failure recovery to re-read a batch deterministically. |
commit(end) |
Informs the source that Spark has completed processing all data for offsets less than or equal to end. |
Examples
Define a custom simplified streaming data source reader:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()