Stream: Averaging and Outliers#

This example shows how to clean up a stream of data by removing outliers and and averaging the values over time.

Working path#

Set the working path where the data is stored. For now we’ll use the provided example data in this current directory.

But the path could be any where on your computer. For example, if you have a folder called “data” in your home directory, you could set the path to: path = "U:\\data\\processing\\Campgain2023_of_aswsome\\data"

# all the imports, but we'll go through them one by one as we use them
import os
import matplotlib.pyplot as plt
from particula.data import loader_interface, settings_generator, stream_stats
from particula.data.tests.example_data.get_example_data import get_data_folder

# set the parent directory of the data folder
path = get_data_folder()
print('Path to data folder:')
print(path.rsplit('particula')[-1])
Path to data folder:
/data/tests/example_data

Load the data#

For this example we’ll use the provided example data. But you can change the path to any folder on your computer. We then can used the settings generator to load the data.

# This method uses the settings_generator module to generate the settings.
settings = settings_generator.for_general_1d_load(
    relative_data_folder='CPC_3010_data',
    filename_regex='*.csv',
    file_min_size_bytes=10,
    data_checks={
        "characters": [10, 100],
        "char_counts": {",": 4},
        "skip_rows": 0,
        "skip_end": 0,
    },
    data_column=[1, 2],
    data_header=['CPC_count[#/sec]', 'Temperature[degC]'],
    time_column=[0],
    time_format='epoch',
    delimiter=',',
    time_shift_seconds=0,
    timezone_identifier='UTC',
)

# now call the loader interface
data_stream = loader_interface.load_files_interface(
    path=path,
    settings=settings,
)
  Loading file: CPC_3010_data_20220710_Jul.csv
  Loading file: CPC_3010_data_20220709_Jul.csv
# print data stream summary
print('Stream:')
print(data_stream)
Stream:
Stream(header=['CPC_count[#/sec]', 'Temperature[degC]'], data=array([[3.3510e+04, 1.7000e+01],
       [3.3465e+04, 1.7100e+01],
       [3.2171e+04, 1.7000e+01],
       ...,
       [1.9403e+04, 1.6900e+01],
       [2.0230e+04, 1.7000e+01],
       [1.9521e+04, 1.6800e+01]]), time=array([1.65734280e+09, 1.65734281e+09, 1.65734281e+09, ...,
       1.65751559e+09, 1.65751560e+09, 1.65751560e+09]), files=[['CPC_3010_data_20220710_Jul.csv', 1078191], ['CPC_3010_data_20220709_Jul.csv', 1011254]])
# plot the data
fig, ax = plt.subplots()
ax.plot(data_stream.datetime64,
        data_stream.data[:, 0],  # data_stream.data is a 2d array, so we need
                                 # to specify which column we want to plot
        label=data_stream.header[0],
        linestyle="none",
        marker=".",)
plt.xticks(rotation=45)
ax.set_xlabel("Time (UTC)")
ax.set_ylabel(data_stream.header[0])
plt.show()
fig.tight_layout()
../../_images/f9ad298c1820118eb08576d04c4335b4fb17f1c0779605cbba7e8773c34666fc.png

Average the data#

Now that we have the data loaded, we can average the data over time. We’ll use the ‘particula.data.stream_stats’ module to do this. The module has a function called ‘averaged_std’ that will take stream object and return a new stream object with the averaged data and the standard deviation of the data.

stream_averaged = stream_stats.average_std(
    stream=data_stream,
    average_interval=600,
)
stream_averaged.standard_deviation.shape
(288, 2)

Plot the averaged data#

fig, ax = plt.subplots()
ax.plot(stream_averaged.datetime64,
        stream_averaged.data[:, 0],
        label=stream_averaged.header[0],
        marker=".",)
plt.xticks(rotation=45)
ax.set_xlabel("Time (UTC)")
ax.set_ylabel(stream_averaged.header[0])
plt.show()
fig.tight_layout()
../../_images/bf719363de53635d3c3930df2c2d749d958f5e5a0284359e90d35044aecd7ef4.png

Clean up the data#

Now we may see some outliers in the data. We can use the ‘particula.data.stream_stats’ module to remove the outliers. The module has a function called ‘filtering’ that will take stream object and return a new stream object with the outliers removed.

stream_filtered = stream_stats.filtering(
    stream=data_stream,
    top=250000,
    drop=True,
)
fig, ax = plt.subplots()
ax.plot(stream_filtered.datetime64,
        stream_filtered.data[:, 0],
        label=stream_filtered.header[0],
        marker=".",)
plt.xticks(rotation=45)
ax.set_xlabel("Time (UTC)")
ax.set_ylabel(stream_filtered.header[0])
plt.show()
fig.tight_layout()
../../_images/1c0eeb4dc7ef4a943b960f87462a10205bfe2509cfd618f6cc851fc74b520b31.png

Summary#

This example shows how to clean up a stream of data by removing outliers and and averaging the values over time.

help(stream_stats)
Help on module particula.data.stream_stats in particula.data:

NAME
    particula.data.stream_stats - Functions to operate on stream objects.

FUNCTIONS
    average_std(stream: particula.data.stream.Stream, average_interval: Union[float, int] = 60, new_time_array: Optional[numpy.ndarray] = None) -> particula.data.stream.StreamAveraged
        Calculate the average and standard deviation of data within a given
        'stream' object over specified intervals.
        
        This function takes a 'stream' object, which should contain time-series
        data, and computes the average and standard deviation of the data at
        intervals specified by 'average_interval'. If data.time is in seconds
        then the units of the interval are seconds (hour in hours etc). The
        results are returned as a new 'StreamAveraged' object containing the
        processed data.
        
        Args:
        - stream (object): The input stream object containing 'time' and 'data'
            arrays along with other associated metadata.
        - average_interval (float|int, optional): The time interval over which the
            averaging is to be performed.
        - new_time_array (np.ndarray, optional): An optional array of time points
            at which the average and standard deviation are computed.
            If not provided, a new time array is generated based on the start and
            end times within the 'stream.time' object.
        
        Returns:
        - StreamAveraged (object): An object of type 'StreamAveraged' containing
            the averaged data, time array, start and stop times, the standard
            deviation of the averaged data, and other metadata from the original
            'stream' object.
        
        The function checks for an existing 'new_time_array' and generates one if
        needed. It then calculates the average and standard deviation for each
        interval and constructs a 'StreamAveraged' object with the results and
        metadata from the original 'stream' object.
    
    drop_masked(stream: particula.data.stream.Stream, mask: numpy.ndarray) -> particula.data.stream.Stream
        Drop rows where mask is false, and return data stream.
        
        Args
        ----------
        stream : object
            data stream object
        mask : np.ndarray
            mask to apply to data stream
        
        Returns
        -------
        object
            stream object
    
    filtering(stream: particula.data.stream.Stream, bottom: Optional[float] = None, top: Optional[float] = None, value: Optional[float] = None, invert: Optional[bool] = False, clone: Optional[bool] = True, replace_with: Union[float, int, NoneType] = None, drop: Optional[bool] = False, header: Union[list, int, str, NoneType] = None) -> particula.data.stream.Stream
        Filters the data of the given 'stream' object based on the specified
        bounds or specific value. The filtered data can be either dropped or
        replaced with a specified value.  Note, not all parameters need to be
        specified, but at least one must be provided (top, bottom, value)
        
        Args:
        - stream (Stream): The input stream object containing 'data' and 'time'
            attributes.
        - bottom (float, optional): The lower bound for filtering data. Defaults
            to None.
        - top (float, optional): The upper bound for filtering data.
            Defaults to None.
        - value (float, optional): Specific value to filter from data.
            Defaults to None.
        - invert (bool): If True, inverts the filter criteria.
            Defaults to False.
        - clone (bool): If True, returns a copy of the 'stream' object, with
            filtered data. If False, modifies the 'stream' object in-place.
            Defaults to True.
        - replace_with (float|int, optional): Value to replace filtered-out data.
            Defaults to None.
        - drop (bool, optional): If True, filtered-out data points are dropped
            from the dataset. Defaults to False.
        - header (list, optional): The header of the data to filter on. This can
            same as calling Stream['header']
            Defaults to None.
        
        Returns:
        - Stream: The 'stream' object with data filtered as specified.
        
        If 'drop' is True, 'replace_with' is ignored and filtered data points are
        removed from the 'stream' object. Otherwise, filtered data points are
        replaced with 'replace_with' value.
        
        add specific data row to filter on
    
    remove_time_window(stream: particula.data.stream.Stream, epoch_start: Union[float, int], epoch_end: Union[float, int, NoneType] = None) -> particula.data.stream.Stream
        Remove a time window from a stream object.
        
        Args:
        - stream: The input stream object containing 'data' and 'time'
            attributes.
        - epoch_start: The start time of the time window to be
            removed.
        - epoch_end: The end time of the time window to be
            removed. If not provided, the time window is the closest time point to
            'epoch_start'.
        
        Returns:
        - Stream: The 'stream' object with the specified time window removed.

DATA
    Optional = typing.Optional
        Optional[X] is equivalent to Union[X, None].
    
    Union = typing.Union
        Union type; Union[X, Y] means either X or Y.
        
        On Python 3.10 and higher, the | operator
        can also be used to denote unions;
        X | Y means the same thing to the type checker as Union[X, Y].
        
        To define a union, use e.g. Union[int, str]. Details:
        - The arguments must be types and there must be at least one.
        - None as an argument is a special case and is replaced by
          type(None).
        - Unions of unions are flattened, e.g.::
        
            assert Union[Union[int, str], float] == Union[int, str, float]
        
        - Unions of a single argument vanish, e.g.::
        
            assert Union[int] == int  # The constructor actually returns int
        
        - Redundant arguments are skipped, e.g.::
        
            assert Union[int, str, int] == Union[int, str]
        
        - When comparing unions, the argument order is ignored, e.g.::
        
            assert Union[int, str] == Union[str, int]
        
        - You cannot subclass or instantiate a union.
        - You can use Optional[X] as a shorthand for Union[X, None].

FILE
    /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/particula/data/stream_stats.py