Stream: Averaging and Outliers#
This example shows how to clean up a stream of data by removing outliers and and averaging the values over time.
Working path#
Set the working path where the data is stored. For now we’ll use the provided example data in this current directory.
But the path could be any where on your computer. For example, if you have a
folder called “data” in your home directory, you could set the path to:
path = "U:\\data\\processing\\Campgain2023_of_aswsome\\data"
# all the imports, but we'll go through them one by one as we use them
import os
import matplotlib.pyplot as plt
from particula.data import loader_interface, settings_generator, stream_stats
from particula.data.tests.example_data.get_example_data import get_data_folder
# set the parent directory of the data folder
path = get_data_folder()
print('Path to data folder:')
print(path.rsplit('particula')[-1])
Path to data folder:
/data/tests/example_data
Load the data#
For this example we’ll use the provided example data. But you can change the path to any folder on your computer. We then can used the settings generator to load the data.
# This method uses the settings_generator module to generate the settings.
settings = settings_generator.for_general_1d_load(
relative_data_folder='CPC_3010_data',
filename_regex='*.csv',
file_min_size_bytes=10,
data_checks={
"characters": [10, 100],
"char_counts": {",": 4},
"skip_rows": 0,
"skip_end": 0,
},
data_column=[1, 2],
data_header=['CPC_count[#/sec]', 'Temperature[degC]'],
time_column=[0],
time_format='epoch',
delimiter=',',
time_shift_seconds=0,
timezone_identifier='UTC',
)
# now call the loader interface
data_stream = loader_interface.load_files_interface(
path=path,
settings=settings,
)
Loading file: CPC_3010_data_20220710_Jul.csv
Loading file: CPC_3010_data_20220709_Jul.csv
# print data stream summary
print('Stream:')
print(data_stream)
Stream:
Stream(header=['CPC_count[#/sec]', 'Temperature[degC]'], data=array([[3.3510e+04, 1.7000e+01],
[3.3465e+04, 1.7100e+01],
[3.2171e+04, 1.7000e+01],
...,
[1.9403e+04, 1.6900e+01],
[2.0230e+04, 1.7000e+01],
[1.9521e+04, 1.6800e+01]]), time=array([1.65734280e+09, 1.65734281e+09, 1.65734281e+09, ...,
1.65751559e+09, 1.65751560e+09, 1.65751560e+09]), files=[['CPC_3010_data_20220710_Jul.csv', 1078191], ['CPC_3010_data_20220709_Jul.csv', 1011254]])
# plot the data
fig, ax = plt.subplots()
ax.plot(data_stream.datetime64,
data_stream.data[:, 0], # data_stream.data is a 2d array, so we need
# to specify which column we want to plot
label=data_stream.header[0],
linestyle="none",
marker=".",)
plt.xticks(rotation=45)
ax.set_xlabel("Time (UTC)")
ax.set_ylabel(data_stream.header[0])
plt.show()
fig.tight_layout()
Average the data#
Now that we have the data loaded, we can average the data over time. We’ll use the ‘particula.data.stream_stats’ module to do this. The module has a function called ‘averaged_std’ that will take stream object and return a new stream object with the averaged data and the standard deviation of the data.
stream_averaged = stream_stats.average_std(
stream=data_stream,
average_interval=600,
)
stream_averaged.standard_deviation.shape
(288, 2)
Plot the averaged data#
fig, ax = plt.subplots()
ax.plot(stream_averaged.datetime64,
stream_averaged.data[:, 0],
label=stream_averaged.header[0],
marker=".",)
plt.xticks(rotation=45)
ax.set_xlabel("Time (UTC)")
ax.set_ylabel(stream_averaged.header[0])
plt.show()
fig.tight_layout()
Clean up the data#
Now we may see some outliers in the data. We can use the ‘particula.data.stream_stats’ module to remove the outliers. The module has a function called ‘filtering’ that will take stream object and return a new stream object with the outliers removed.
stream_filtered = stream_stats.filtering(
stream=data_stream,
top=250000,
drop=True,
)
fig, ax = plt.subplots()
ax.plot(stream_filtered.datetime64,
stream_filtered.data[:, 0],
label=stream_filtered.header[0],
marker=".",)
plt.xticks(rotation=45)
ax.set_xlabel("Time (UTC)")
ax.set_ylabel(stream_filtered.header[0])
plt.show()
fig.tight_layout()
Summary#
This example shows how to clean up a stream of data by removing outliers and and averaging the values over time.
help(stream_stats)
Help on module particula.data.stream_stats in particula.data:
NAME
particula.data.stream_stats - Functions to operate on stream objects.
FUNCTIONS
average_std(stream: particula.data.stream.Stream, average_interval: Union[float, int] = 60, new_time_array: Optional[numpy.ndarray] = None) -> particula.data.stream.StreamAveraged
Calculate the average and standard deviation of data within a given
'stream' object over specified intervals.
This function takes a 'stream' object, which should contain time-series
data, and computes the average and standard deviation of the data at
intervals specified by 'average_interval'. If data.time is in seconds
then the units of the interval are seconds (hour in hours etc). The
results are returned as a new 'StreamAveraged' object containing the
processed data.
Args:
- stream (object): The input stream object containing 'time' and 'data'
arrays along with other associated metadata.
- average_interval (float|int, optional): The time interval over which the
averaging is to be performed.
- new_time_array (np.ndarray, optional): An optional array of time points
at which the average and standard deviation are computed.
If not provided, a new time array is generated based on the start and
end times within the 'stream.time' object.
Returns:
- StreamAveraged (object): An object of type 'StreamAveraged' containing
the averaged data, time array, start and stop times, the standard
deviation of the averaged data, and other metadata from the original
'stream' object.
The function checks for an existing 'new_time_array' and generates one if
needed. It then calculates the average and standard deviation for each
interval and constructs a 'StreamAveraged' object with the results and
metadata from the original 'stream' object.
drop_masked(stream: particula.data.stream.Stream, mask: numpy.ndarray) -> particula.data.stream.Stream
Drop rows where mask is false, and return data stream.
Args
----------
stream : object
data stream object
mask : np.ndarray
mask to apply to data stream
Returns
-------
object
stream object
filtering(stream: particula.data.stream.Stream, bottom: Optional[float] = None, top: Optional[float] = None, value: Optional[float] = None, invert: Optional[bool] = False, clone: Optional[bool] = True, replace_with: Union[float, int, NoneType] = None, drop: Optional[bool] = False, header: Union[list, int, str, NoneType] = None) -> particula.data.stream.Stream
Filters the data of the given 'stream' object based on the specified
bounds or specific value. The filtered data can be either dropped or
replaced with a specified value. Note, not all parameters need to be
specified, but at least one must be provided (top, bottom, value)
Args:
- stream (Stream): The input stream object containing 'data' and 'time'
attributes.
- bottom (float, optional): The lower bound for filtering data. Defaults
to None.
- top (float, optional): The upper bound for filtering data.
Defaults to None.
- value (float, optional): Specific value to filter from data.
Defaults to None.
- invert (bool): If True, inverts the filter criteria.
Defaults to False.
- clone (bool): If True, returns a copy of the 'stream' object, with
filtered data. If False, modifies the 'stream' object in-place.
Defaults to True.
- replace_with (float|int, optional): Value to replace filtered-out data.
Defaults to None.
- drop (bool, optional): If True, filtered-out data points are dropped
from the dataset. Defaults to False.
- header (list, optional): The header of the data to filter on. This can
same as calling Stream['header']
Defaults to None.
Returns:
- Stream: The 'stream' object with data filtered as specified.
If 'drop' is True, 'replace_with' is ignored and filtered data points are
removed from the 'stream' object. Otherwise, filtered data points are
replaced with 'replace_with' value.
add specific data row to filter on
remove_time_window(stream: particula.data.stream.Stream, epoch_start: Union[float, int], epoch_end: Union[float, int, NoneType] = None) -> particula.data.stream.Stream
Remove a time window from a stream object.
Args:
- stream: The input stream object containing 'data' and 'time'
attributes.
- epoch_start: The start time of the time window to be
removed.
- epoch_end: The end time of the time window to be
removed. If not provided, the time window is the closest time point to
'epoch_start'.
Returns:
- Stream: The 'stream' object with the specified time window removed.
DATA
Optional = typing.Optional
Optional[X] is equivalent to Union[X, None].
Union = typing.Union
Union type; Union[X, Y] means either X or Y.
On Python 3.10 and higher, the | operator
can also be used to denote unions;
X | Y means the same thing to the type checker as Union[X, Y].
To define a union, use e.g. Union[int, str]. Details:
- The arguments must be types and there must be at least one.
- None as an argument is a special case and is replaced by
type(None).
- Unions of unions are flattened, e.g.::
assert Union[Union[int, str], float] == Union[int, str, float]
- Unions of a single argument vanish, e.g.::
assert Union[int] == int # The constructor actually returns int
- Redundant arguments are skipped, e.g.::
assert Union[int, str, int] == Union[int, str]
- When comparing unions, the argument order is ignored, e.g.::
assert Union[int, str] == Union[str, int]
- You cannot subclass or instantiate a union.
- You can use Optional[X] as a shorthand for Union[X, None].
FILE
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/particula/data/stream_stats.py