Source code for baloo.core.series

from collections import OrderedDict

import numpy as np
from pandas import Series as PandasSeries
from tabulate import tabulate

from .generic import BinaryOps, BitOps, BalooCommon
from .indexes import Index, MultiIndex
from .utils import infer_dtype, default_index, check_type, is_scalar, check_valid_int_slice, check_weld_bit_array, \
    convert_to_numpy, check_dtype, shorten_data
from ..weld import LazyArrayResult, weld_compare, numpy_to_weld_type, weld_filter, \
    weld_slice, weld_array_op, weld_invert, weld_tail, weld_element_wise_op, LazyDoubleResult, LazyScalarResult, \
    weld_mean, weld_variance, weld_standard_deviation, WeldObject, weld_agg, weld_iloc_indices, \
    weld_iloc_indices_with_missing, weld_unique, default_missing_data_literal, weld_replace, weld_udf


[docs]class Series(LazyArrayResult, BinaryOps, BitOps, BalooCommon): """Weld-ed Pandas Series. Attributes ---------- index dtype name iloc See Also -------- pandas.Series : https://pandas.pydata.org/pandas-docs/stable/generated/pandas.Series.html Examples -------- >>> import baloo as bl >>> import numpy as np >>> sr = bl.Series([0, 1, 2]) >>> sr Series(name=None, dtype=int64) >>> sr.index RangeIndex(start=0, stop=3, step=1) >>> sr = sr.evaluate() >>> sr # repr Series(name=None, dtype=int64) >>> print(sr) # str <BLANKLINE> --- -- 0 0 1 1 2 2 >>> sr.index Index(name=None, dtype=int64) >>> print(sr.index) [0 1 2] >>> len(sr) # eager computation 3 >>> sr.values array([0, 1, 2]) >>> (sr + 2).evaluate().values array([2, 3, 4]) >>> (sr - bl.Index(np.arange(3))).evaluate().values array([0, 0, 0]) >>> print(sr.max().evaluate()) 2 >>> print(sr.var().evaluate()) 1.0 >>> print(sr.agg(['min', 'std']).evaluate()) <BLANKLINE> --- -- min 0 std 1 """ _empty_text = 'Empty Series' # TODO: Fix en/decoding string's dtype; e.g. filter returns max length dtype (|S11) even if actual result is |S7
[docs] def __init__(self, data=None, index=None, dtype=None, name=None): """Initialize a Series object. Parameters ---------- data : numpy.ndarray or WeldObject or list, optional Raw data or Weld expression. index : Index or RangeIndex or MultiIndex, optional Index linked to the data; it is assumed to be of the same length. RangeIndex by default. dtype : numpy.dtype or type, optional Desired Numpy dtype for the elements. If type, it must be a NumPy type, e.g. np.float32. If data is np.ndarray with a dtype different to dtype argument, it is astype'd to the argument dtype. Note that if data is WeldObject, one must explicitly astype to convert type. Inferred from `data` by default. name : str, optional Name of the Series. """ data, dtype = _process_input(data, dtype) self.index = _process_index(index, data) self.dtype = dtype self.name = check_type(name, str) # TODO: this should be used to annotate Weld code for speedups self._length = len(data) if isinstance(data, np.ndarray) else None super(Series, self).__init__(data, numpy_to_weld_type(self.dtype))
@property def name(self): return self._name @name.setter def name(self, value): self._name = value @property def iloc(self): """Retrieve Indexer by index. Supported iloc functionality exemplified below. Returns ------- _ILocIndexer Examples -------- >>> sr = bl.Series(np.arange(3)) >>> print(sr.iloc[2].evaluate()) 2 >>> print(sr.iloc[0:2].evaluate()) <BLANKLINE> --- -- 0 0 1 1 >>> print(sr.iloc[bl.Series(np.array([0, 2]))].evaluate()) <BLANKLINE> --- -- 0 0 2 2 """ from .indexing import _ILocIndexer return _ILocIndexer(self) @property def str(self): """Get Access to string functions. Returns ------- StringMethods Examples -------- >>> sr = bl.Series([b' aB ', b'GoOsfrABA']) >>> print(sr.str.lower().evaluate()) <BLANKLINE> --- --------- 0 ab 1 goosfraba """ if self.dtype.char != 'S': raise AttributeError('Can only use .str when data is of type np.bytes_') from .strings import StringMethods return StringMethods(self) def __repr__(self): return "{}(name={}, dtype={})".format(self.__class__.__name__, self.name, self.dtype) def __str__(self): if self.empty: return self._empty_text # index str_data = OrderedDict() str_data.update((name, shorten_data(data.values)) for name, data in self.index._gather_data(' ').items()) # self data name = '' if self.name is None else self.name str_data[name] = shorten_data(self.values) return tabulate(str_data, headers='keys') def _comparison(self, other, comparison): if other is None: other = default_missing_data_literal(self.weld_type) return _series_compare(self, other, comparison) elif is_scalar(other): return _series_compare(self, other, comparison) else: raise TypeError('Can currently only compare with scalars') def _bitwise_operation(self, other, operation): check_type(other, LazyArrayResult) check_weld_bit_array(other) check_weld_bit_array(self) return _series_array_op(self, other, operation) def _element_wise_operation(self, other, operation): if isinstance(other, LazyArrayResult): return _series_array_op(self, other, operation) elif is_scalar(other): return _series_element_wise_op(self, other, operation) else: raise TypeError('Can only apply operation with scalar or Series') def astype(self, dtype): check_dtype(dtype) return Series(self._astype(dtype), self.index, dtype, self.name)
[docs] def __getitem__(self, item): """Select from the Series. Supported selection functionality exemplified below. Examples -------- >>> sr = bl.Series(np.arange(5, dtype=np.float32), name='Test') >>> sr = sr[sr > 0] >>> sr Series(name=Test, dtype=float32) >>> print(sr.evaluate()) Test --- ------ 1 1 2 2 3 3 4 4 >>> sr = sr[(sr != 1) & ~(sr > 3)] >>> print(sr.evaluate()) Test --- ------ 2 2 3 3 >>> print(sr[:1].evaluate()) Test --- ------ 2 2 """ if isinstance(item, LazyArrayResult): check_weld_bit_array(item) return _series_filter(self, item, self.index[item]) elif isinstance(item, slice): check_valid_int_slice(item) return _series_slice(self, item, self.index[item]) else: raise TypeError('Expected a LazyArrayResult or a slice')
def __invert__(self): check_weld_bit_array(self) return Series(weld_invert(self.weld_expr), self.index, self.dtype, self.name) # TODO: perhaps skip making a new object if data is raw already?
[docs] def evaluate(self, verbose=False, decode=True, passes=None, num_threads=1, apply_experimental=True): """Evaluates by creating a Series containing evaluated data and index. See `LazyResult` Returns ------- Series Series with evaluated data and index. Examples -------- >>> sr = bl.Series(np.arange(3)) > 0 >>> weld_code = sr.values # accessing values now returns the weld code as a string >>> sr = sr.evaluate() >>> sr.values # now it is evaluated to raw data array([False, True, True]) """ # TODO: work on masking (branch masking) ~ evaluate the mask first and use on both index and data; # TODO right now the filter gets computed twice, once for index and once for the data evaluated_data = super(Series, self).evaluate(verbose, decode, passes, num_threads, apply_experimental) evaluated_index = self.index.evaluate(verbose, decode, passes, num_threads, apply_experimental) return Series(evaluated_data, evaluated_index, self.dtype, self.name)
[docs] def head(self, n=5): """Return Series with first n values. Parameters ---------- n : int Number of values. Returns ------- Series Series containing the first n values. Examples -------- >>> sr = bl.Series(np.arange(3)) >>> print(sr.head(2).evaluate()) <BLANKLINE> --- -- 0 0 1 1 """ return self[:n]
[docs] def tail(self, n=5): """Return Series with the last n values. Parameters ---------- n : int Number of values. Returns ------- Series Series containing the last n values. Examples -------- >>> sr = bl.Series(np.arange(3)) >>> print(sr.tail(2).evaluate()) <BLANKLINE> --- -- 1 1 2 2 """ if self._length is not None: length = self._length else: length = self._lazy_len().weld_expr return _series_tail(self, self.index.tail(n), length, n)
def sum(self): return LazyScalarResult(self._aggregate('+').weld_expr, self.weld_type) def prod(self): return LazyScalarResult(self._aggregate('*').weld_expr, self.weld_type) def count(self): return self._lazy_len() def mean(self): return LazyDoubleResult(weld_mean(self.weld_expr, self.weld_type)) def var(self): return LazyDoubleResult(weld_variance(self.weld_expr, self.weld_type)) def std(self): return LazyDoubleResult(weld_standard_deviation(self.weld_expr, self.weld_type)) # TODO: currently casting everything to float64 (even if already f64 ~ weld_aggs TODO); # TODO maybe for min/max/count/sum/prod/.. cast ints to int64 like pandas does
[docs] def agg(self, aggregations): """Multiple aggregations optimized. Parameters ---------- aggregations : list of str Which aggregations to perform. Returns ------- Series Series with resulting aggregations. """ check_type(aggregations, list) new_index = Index(np.array(aggregations, dtype=np.bytes_), np.dtype(np.bytes_)) return _series_agg(self, aggregations, new_index)
[docs] def unique(self): """Return unique values in the Series. Note that because it is hash-based, the result will NOT be in the same order (unlike pandas). Returns ------- LazyArrayResult Unique values in random order. """ return LazyArrayResult(weld_unique(self.values, self.weld_type), self.weld_type)
[docs] def dropna(self): """Returns Series without null values according to Baloo's convention. Returns ------- Series Series with no null values. """ return self[self.notna()]
[docs] def fillna(self, value): """Returns Series with missing values replaced with value. Parameters ---------- value : {int, float, bytes, bool} Scalar value to replace missing values with. Returns ------- Series With missing values replaced. """ if not is_scalar(value): raise TypeError('Value to replace with is not a valid scalar') return Series(weld_replace(self.weld_expr, self.weld_type, default_missing_data_literal(self.weld_type), value), self.index, self.dtype, self.name)
[docs] def apply(self, func, mapping=None, new_dtype=None, **kwargs): """Apply an element-wise UDF to the Series. There are currently 6 options for using a UDF. First 4 are lazy, other 2 are eager and require the use of the raw decorator: - One of the predefined functions in baloo.functions. - Implementing a function which encodes the result. kwargs are automatically passed to it. - Pure Weld code and mapping. - Weld code and mapping along with a dynamically linked C++ lib containing the UDF. - Using a NumPy function, which however is EAGER and hence requires self.values to be raw. Additionally, NumPy does not support kwargs in (all) functions so must use raw decorator to strip away weld_type. - Implementing an eager function with the same precondition as above. Use the raw decorator to check this. Parameters ---------- func : function or str Weld code as a str to encode or function from baloo.functions. mapping : dict, optional Additional mappings in the weld_template to replace on execution. self is added by default to reference to this Series. new_dtype : numpy.dtype, optional Specify the new dtype of the result Series. If None, it assumes it's the same dtype as before the apply. Returns ------- Series With UDF result. Examples -------- >>> import baloo as bl >>> sr = bl.Series([1, 2, 3]) >>> weld_template = 'map({self}, |e| e + {scalar})' >>> mapping = {'scalar': '2L'} >>> print(sr.apply(weld_template, mapping).evaluate()) <BLANKLINE> --- -- 0 3 1 4 2 5 >>> weld_template2 = 'map({self}, |e| e + 3L)' >>> print(sr.apply(weld_template2).evaluate()) <BLANKLINE> --- -- 0 4 1 5 2 6 >>> print(bl.Series([1., 4., 100.]).apply(bl.sqrt).evaluate()) # lazy predefined function <BLANKLINE> --- -- 0 1 1 2 2 10 >>> sr = bl.Series([4, 2, 3, 1]) >>> print(sr.apply(bl.sort, kind='q').evaluate()) # eager wrapper over np.sort (which uses raw decorator) <BLANKLINE> --- -- 0 1 1 2 2 3 3 4 >>> print(sr.apply(bl.raw(np.sort, kind='q')).evaluate()) # np.sort directly <BLANKLINE> --- -- 0 1 1 2 2 3 3 4 >>> print(sr.apply(bl.raw(lambda x: np.sort(x, kind='q'))).evaluate()) # lambda also works, with x = np.array <BLANKLINE> --- -- 0 1 1 2 2 3 3 4 # check tests/core/cudf/* and tests/core/test_series.test_cudf for C UDF example """ if callable(func): return Series(func(self.values, weld_type=self.weld_type, **kwargs), self.index, self.dtype, self.name) elif isinstance(func, str): check_type(mapping, dict) check_dtype(new_dtype) default_mapping = {'self': self.values} if mapping is None: mapping = default_mapping else: mapping.update(default_mapping) if new_dtype is None: new_dtype = self.dtype return Series(weld_udf(func, mapping), self.index, new_dtype, self.name) else: raise TypeError('Expected function or str defining a weld_template')
[docs] @classmethod def from_pandas(cls, series): """Create baloo Series from pandas Series. Parameters ---------- series : pandas.series.Series Returns ------- Series """ from pandas import Index as PandasIndex, MultiIndex as PandasMultiIndex if isinstance(series.index, PandasIndex): baloo_index = Index.from_pandas(series.index) elif isinstance(series.index, PandasMultiIndex): baloo_index = MultiIndex.from_pandas(series.index) else: raise TypeError('Cannot convert pandas index of type={} to baloo'.format(type(series.index))) return _series_from_pandas(series, baloo_index)
[docs] def to_pandas(self): """Convert to pandas Series Returns ------- pandas.series.Series """ pandas_index = self.index.to_pandas() return _series_to_pandas(self, pandas_index)
def _process_input(data, dtype): if data is None: return np.empty(0), np.dtype(np.float64) else: check_type(data, (np.ndarray, WeldObject, list)) check_dtype(dtype) if isinstance(data, list): data = convert_to_numpy(data) inferred_dtype = infer_dtype(data, dtype) if isinstance(data, np.ndarray) and data.dtype.char != inferred_dtype.char: data = data.astype(inferred_dtype) return data, inferred_dtype def _process_index(index, data): if index is None: return default_index(data) else: return check_type(index, (Index, MultiIndex)) def _series_compare(series, other, comparison): return Series(weld_compare(series.weld_expr, other, comparison, series.weld_type), series.index, np.dtype(np.bool), series.name) # the following methods are shortcuts for DataFrame ops to avoid e.g. recomputing index def _series_agg(series, aggregations, index): return Series(weld_agg(series.weld_expr, series.weld_type, aggregations), index, np.dtype(np.float64)) def _series_array_op(series, other, operation): return Series(weld_array_op(series.weld_expr, other.weld_expr, series.weld_type, operation), series.index, series.dtype, series.name) def _series_element_wise_op(series, other, operation): return Series(weld_element_wise_op(series.weld_expr, series.weld_type, other, operation), series.index, series.dtype, series.name) def _series_filter(series, item, index): return Series(weld_filter(series.weld_expr, series.weld_type, item.weld_expr), index, series.dtype, series.name) def _series_slice(series, item, index): return Series(weld_slice(series.weld_expr, series.weld_type, item), index, series.dtype, series.name) def _series_tail(series, index, length, n): return Series(weld_tail(series.weld_expr, length, n), index, series.dtype, series.name) def _series_iloc(series, item, new_index): return Series(weld_iloc_indices(series.weld_expr, series.weld_type, item), new_index, series.dtype, series.name) def _series_iloc_with_missing(series, item, new_index): return Series(weld_iloc_indices_with_missing(series.weld_expr, series.weld_type, item), new_index, series.dtype, series.name) def _series_from_pandas(series, baloo_index): return Series(series.values, baloo_index, series.dtype, series.name) def _series_to_pandas(series, pandas_index): return PandasSeries(series.values, pandas_index, series.dtype, series.name)