Skip to content

sgn.sources

Source elements for generating data streams.

New classes need not be subclassed from DequeSource, but should at least be ultimately a subclass of SourceElement.

DequeSource dataclass

Bases: IterSource

A source element that has one double-ended-queue (deque) per source pad.

The end of stream is controlled by setting an optional limit on the number of times a deque can be empty before EOS is signaled.

Parameters:

Name Type Description Default
iters Optional[dict[str, Iterable[Any]]]

dict[str, deque ], a mapping of source pads to deque s, where the key is the pad name and the value is the deque

None
eos_on_empty Union[dict[str, bool], bool]

Union[dict[str, bool], bool], default True, a mapping of source pads to boolean values, where the key is the pad name and the value is the boolean. If a bool is given, the value is applied to all pads. If True, EOS is signaled when the deque is empty.

True
Source code in sgn/sources.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@dataclass
class DequeSource(IterSource):
    """A source element that has one double-ended-queue (deque) per source pad.

    The end of stream is controlled by setting an optional limit on the number
    of times a deque can be empty before EOS is signaled.

    Args:
        iters:
            dict[str, deque ], a mapping of source pads to deque s, where the
            key is the pad name and the value is the deque
        eos_on_empty:
            Union[dict[str, bool], bool], default True, a mapping of source
            pads to boolean values, where the key is the pad name and the value
            is the boolean. If a bool is given, the value is applied to all
            pads. If True, EOS is signaled when the deque is empty.
    """

    def _coerce_iterator(self, iterable):
        """Coerce the iterable to an iterator if it is not already one.

        Args:
            iterable:
                Iterable, the iterable to coerce

        Returns:
            Iterator, the iterator
        """
        return deque(iterable)

    def _get_value(self, deque):
        """Get the next value from the deque.

        Args:
            deque :
                deque , the deque to get the value from

        Returns:
            Any, the next value from the deque
        """
        try:
            return deque.pop()
        except IndexError:
            return None

    @property
    def deques(self) -> dict[str, Iterable]:
        """Get the iters property with more explicit alias."""
        assert isinstance(self.iters, dict)
        return self.iters

deques property

Get the iters property with more explicit alias.

IterSource dataclass

Bases: SourceElement

A source element that has one iterable per source pad.

The end of stream is controlled by setting an optional limit on the number of times a deque can be empty before EOS is signaled.

Parameters:

Name Type Description Default
iters Optional[dict[str, Iterable[Any]]]

dict[str, Iterable[Any]], a mapping of source pads to iterables, where the key is the pad name and the value is the Iterable. These will be coerced to iterators, so they can be any iterable type.

None
eos_on_empty Union[dict[str, bool], bool]

Union[dict[str, bool], bool], default True, a mapping of source pads to boolean values, where the key is the pad name and the value is the boolean. If a bool is given, the value is applied to all pads. If True, EOS is signaled when the iterator is empty.

True
Source code in sgn/sources.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
@dataclass
class IterSource(SourceElement):
    """A source element that has one iterable per source pad.

    The end of stream is controlled by setting an optional limit on the number
    of times a deque can be empty before EOS is signaled.

    Args:
        iters:
            dict[str, Iterable[Any]], a mapping of source pads to iterables,
            where the key is the pad name and the value is the Iterable. These
            will be coerced to iterators, so they can be any iterable type.
        eos_on_empty:
            Union[dict[str, bool], bool], default True, a mapping of source
            pads to boolean values, where the key is the pad name and the value
            is the boolean. If a bool is given, the value is applied to all
            pads. If True, EOS is signaled when the iterator is empty.
    """

    iters: Optional[dict[str, Iterable[Any]]] = None
    eos_on_empty: Union[dict[str, bool], bool] = True
    frame_factory: Callable = Frame

    def __post_init__(self):
        """Post init checks for the DequeSource element."""
        super().__post_init__()
        # Setup pad counts
        self._setup_iters()
        self._setup_eos_on_empty()
        self._validate_iters()
        self._validate_eos_on_empty()

    def _setup_iters(self):
        # Setup the iter_map if not given
        if self.iters is None:
            self.iters = {
                pad.name: self._coerce_iterator([]) for pad in self.source_pads
            }
        else:
            self.iters = {
                name: self._coerce_iterator(iterable)
                for name, iterable in self.iters.items()
            }

    def _setup_eos_on_empty(self):
        # Setup the limits if not given
        if isinstance(self.eos_on_empty, bool):
            self.eos_on_empty = {
                pad.name: self.eos_on_empty for pad in self.source_pads
            }

    def _validate_iters(self):
        # Check that the deque_map has the correct number of deque s
        if not len(self.iters) == len(self.source_pads):
            raise ValueError("The number of deque s must match the number of pads")

        # Check that the deque_map has the correct pad names
        for pad_name in self.iters:
            if pad_name not in [pad.name for pad in self.source_pads]:
                raise ValueError(
                    "DequeSource has a deque for a pad that does not exist, "
                    f"got: {pad_name}, options are: {self.source_pad_names}"
                )

    def _validate_eos_on_empty(self):
        # Check that the limits has the correct number of limits
        if not len(self.eos_on_empty) == len(self.source_pads):
            raise ValueError("The number of eos on empty must match the number of pads")

        # Check that the limits has the correct pad names
        for pad_name in self.eos_on_empty:
            if pad_name not in [pad.name for pad in self.source_pads]:
                raise ValueError(
                    f"DequeSource has a eos on empty for a pad that does not exist, "
                    f"got: {pad_name}, options are: {self.source_pad_names}"
                )

    def _coerce_iterator(self, iterable):
        """Coerce the iterable to an iterator if it is not already one.

        Args:
            iterable:
                Iterable, the iterable to coerce

        Returns:
            Iterator, the iterator
        """
        # Check if already an iterator or generator
        if isinstance(iterable, (Iterator, Generator)):
            return iterable

        return iter(iterable)

    def _get_value(self, iterator):
        """Get the next value from the iterator.

        Args:
            iterator:
                Iterator, the iterator to get the value from

        Returns:
            Any, the next value from the iterator
        """
        try:
            return next(iterator)
        except StopIteration:
            return None

    def update(self, pad: SourcePad):
        """Update the iterator for the pad. This is a no-op for IterSource. For
        subclasses that need to update the iterator, this method should be overridden.
        Examples include reading from a file or network stream.

        Args:
            pad:
                SourcePad, the pad to update
        """
        pass

    def new(self, pad: SourcePad) -> Frame:
        """New Frames are created on "pad" with an instance specific count and a name
        derived from the pad name. EOS is set if we have surpassed the requested number
        of Frames.

        Args:
            pad:
                SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame with optional data payload
        """
        # Update the pad iterator
        self.update(pad=pad)

        # Get the pad iterator
        assert isinstance(self.iters, dict)
        assert isinstance(self.eos_on_empty, dict)
        pad_iter = self.iters[pad.name]
        pad_eos_on_empty = self.eos_on_empty[pad.name]

        # Get data from the iterator
        data = self._get_value(pad_iter)

        # Return the frame
        return self.frame_factory(EOS=data is None and pad_eos_on_empty, data=data)

__post_init__()

Post init checks for the DequeSource element.

Source code in sgn/sources.py
150
151
152
153
154
155
156
157
def __post_init__(self):
    """Post init checks for the DequeSource element."""
    super().__post_init__()
    # Setup pad counts
    self._setup_iters()
    self._setup_eos_on_empty()
    self._validate_iters()
    self._validate_eos_on_empty()

new(pad)

New Frames are created on "pad" with an instance specific count and a name derived from the pad name. EOS is set if we have surpassed the requested number of Frames.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame with optional data payload

Source code in sgn/sources.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def new(self, pad: SourcePad) -> Frame:
    """New Frames are created on "pad" with an instance specific count and a name
    derived from the pad name. EOS is set if we have surpassed the requested number
    of Frames.

    Args:
        pad:
            SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame with optional data payload
    """
    # Update the pad iterator
    self.update(pad=pad)

    # Get the pad iterator
    assert isinstance(self.iters, dict)
    assert isinstance(self.eos_on_empty, dict)
    pad_iter = self.iters[pad.name]
    pad_eos_on_empty = self.eos_on_empty[pad.name]

    # Get data from the iterator
    data = self._get_value(pad_iter)

    # Return the frame
    return self.frame_factory(EOS=data is None and pad_eos_on_empty, data=data)

update(pad)

Update the iterator for the pad. This is a no-op for IterSource. For subclasses that need to update the iterator, this method should be overridden. Examples include reading from a file or network stream.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad to update

required
Source code in sgn/sources.py
235
236
237
238
239
240
241
242
243
244
def update(self, pad: SourcePad):
    """Update the iterator for the pad. This is a no-op for IterSource. For
    subclasses that need to update the iterator, this method should be overridden.
    Examples include reading from a file or network stream.

    Args:
        pad:
            SourcePad, the pad to update
    """
    pass

NullSource dataclass

Bases: SourceElement, SignalEOS

A source that does precisely nothing.

It is useful for testing and debugging, and will always produce empty frames

frame_factory: Callable = Frame
wait: float = None
num_frames: int = None

If wait is not None the source will block for wait seconds before each new buffer, which is useful for slowing down debugging pipelines. By default this source element handles SIGINT and uses that to set EOS. See SignalEOS. In order to use this feature, the pipeline must be run within the SignalEOS context manager, e.g.,

with SignalEOS() as signal_eos:
    p.run()
Source code in sgn/sources.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass
class NullSource(SourceElement, SignalEOS):
    """A source that does precisely nothing.

    It is useful for testing and debugging, and will always produce empty frames

        frame_factory: Callable = Frame
        wait: float = None
        num_frames: int = None

    If wait is not None the source will block for wait seconds before each new
    buffer, which is useful for slowing down debugging pipelines.  By default
    this source element handles SIGINT and uses that to set EOS. See SignalEOS.
    In order to use this feature, the pipeline must be run within the SignalEOS
    context manager, e.g.,

        with SignalEOS() as signal_eos:
            p.run()
    """

    frame_factory: Callable = Frame
    wait: Optional[float] = None
    num_frames: Optional[int] = None

    def __post_init__(self):
        super().__post_init__()
        self.frame_count = 0

    def new(self, pad: SourcePad) -> Frame:
        """New Frames are created on "pad" with an instance specific count and a name
        derived from the pad name. EOS is set if we have surpassed the requested number
        of Frames.

        Args:
            pad:
                SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame with optional data payload
        """
        if self.wait is not None:
            sleep(self.wait)
        self.frame_count += 1
        return self.frame_factory(
            EOS=self.signaled_eos()
            or (self.num_frames is not None and self.frame_count > self.num_frames),
            data=None,
        )

new(pad)

New Frames are created on "pad" with an instance specific count and a name derived from the pad name. EOS is set if we have surpassed the requested number of Frames.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame with optional data payload

Source code in sgn/sources.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def new(self, pad: SourcePad) -> Frame:
    """New Frames are created on "pad" with an instance specific count and a name
    derived from the pad name. EOS is set if we have surpassed the requested number
    of Frames.

    Args:
        pad:
            SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame with optional data payload
    """
    if self.wait is not None:
        sleep(self.wait)
    self.frame_count += 1
    return self.frame_factory(
        EOS=self.signaled_eos()
        or (self.num_frames is not None and self.frame_count > self.num_frames),
        data=None,
    )

SignalEOS

This class provides global signal handling for an SGN pipeline. If you inherit it for a source element then it will capture SIGINT and provide a method to mark that eos should be flagged. See NullSource as an example.

Additionally this must be used as a context manager for executing a pipeline and disabling the signal hander after the pipeline is done, e.g.,

with SignalEOS() as signal_eos:
    p.run()
Source code in sgn/sources.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class SignalEOS:
    """
    This class provides global signal handling for an SGN pipeline.  If you
    inherit it for a source element then it will capture SIGINT and provide a
    method to mark that eos should be flagged.  See NullSource as an example.

    Additionally this must be used as a context manager for executing
    a pipeline and disabling the signal hander after the pipeline is done, e.g.,

        with SignalEOS() as signal_eos:
            p.run()

    """

    handled_signals = {signal.SIGINT, signal.SIGTERM}
    rcvd_signals: set[int] = set([])
    previous_handlers: dict[int, Callable] = {}

    @classmethod
    def signaled_eos(cls):
        """Indicate whether a signal has been received to indicate an EOS.

        Returns true of the intersection of received signals and handled
        signals is nonzero.  This can be used by developers to decide if EOS
        should be set.
        """
        return bool(cls.rcvd_signals & cls.handled_signals)

    def raise_signal(self, sig):
        """Raise a signal that has already been raised previously.

        Intended to be used if the application needs to re-raise one of the
        signals with the previous signal handler.  NOTE - this will only raise
        the signal if it had been previously raised and only within a given
        context.
        """
        if sig in SignalEOS.rcvd_signals:
            signal.raise_signal(sig)

    def __enter__(self):
        """Store the previous signal handlers and setup new ones for the
        handled signals"""
        for sig in SignalEOS.handled_signals:
            SignalEOS.previous_handlers[sig] = signal.getsignal(sig)
            signal.signal(sig, _handler)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Restore the original signal handlers"""
        for sig in SignalEOS.handled_signals:
            signal.signal(sig, SignalEOS.previous_handlers[sig])
        SignalEOS.rcvd_signals = set([])

__enter__()

Store the previous signal handlers and setup new ones for the handled signals

Source code in sgn/sources.py
62
63
64
65
66
67
68
def __enter__(self):
    """Store the previous signal handlers and setup new ones for the
    handled signals"""
    for sig in SignalEOS.handled_signals:
        SignalEOS.previous_handlers[sig] = signal.getsignal(sig)
        signal.signal(sig, _handler)
    return self

__exit__(exc_type, exc_val, exc_tb)

Restore the original signal handlers

Source code in sgn/sources.py
70
71
72
73
74
def __exit__(self, exc_type, exc_val, exc_tb):
    """Restore the original signal handlers"""
    for sig in SignalEOS.handled_signals:
        signal.signal(sig, SignalEOS.previous_handlers[sig])
    SignalEOS.rcvd_signals = set([])

raise_signal(sig)

Raise a signal that has already been raised previously.

Intended to be used if the application needs to re-raise one of the signals with the previous signal handler. NOTE - this will only raise the signal if it had been previously raised and only within a given context.

Source code in sgn/sources.py
51
52
53
54
55
56
57
58
59
60
def raise_signal(self, sig):
    """Raise a signal that has already been raised previously.

    Intended to be used if the application needs to re-raise one of the
    signals with the previous signal handler.  NOTE - this will only raise
    the signal if it had been previously raised and only within a given
    context.
    """
    if sig in SignalEOS.rcvd_signals:
        signal.raise_signal(sig)

signaled_eos() classmethod

Indicate whether a signal has been received to indicate an EOS.

Returns true of the intersection of received signals and handled signals is nonzero. This can be used by developers to decide if EOS should be set.

Source code in sgn/sources.py
41
42
43
44
45
46
47
48
49
@classmethod
def signaled_eos(cls):
    """Indicate whether a signal has been received to indicate an EOS.

    Returns true of the intersection of received signals and handled
    signals is nonzero.  This can be used by developers to decide if EOS
    should be set.
    """
    return bool(cls.rcvd_signals & cls.handled_signals)