Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Sets the output of the streaming query to be processed using the provided writer. The processing logic can be specified as a function that takes a row as input, or as an object with process(row) and optional open(partition_id, epoch_id) and close(error) methods.
Syntax
foreach(f)
Parameters
| Parameter | Type | Description |
|---|---|---|
f |
callable or object | A function that takes a Row as input, or an object with a process(row) method and optional open and close methods. |
Returns
DataStreamWriter
Notes
The provided object must be serializable. Any initialization for writing data (for example, opening a connection) should be done inside open(), not at construction time.
Examples
import time
df = spark.readStream.format("rate").load()
Process each row using a function:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Process each row using an object with open, process, and close methods:
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()