Skip to content
Mars is a tensor-based unified framework for large-scale data computation which scales Numpy, Pandas and Scikit-learn.
Python CSS HTML C++ Shell JavaScript
Branch: master
Clone or download

Files

Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
.github Add MarsDistributor for tsfresh library (#1277) Jun 5, 2020
bin Remove *_pb2.py to reduce chances of code conflict (#913) Jan 13, 2020
docs Implement DataFrame.melt (#1284) Jun 10, 2020
mars [ENH] Allow client-side to use pickle to serialize / deserialize tens… Jun 11, 2020
.codacy.yml Drop support for Python 2 (#872) Dec 16, 2019
.codecov.yml Vineyard data source and sink. (#1000) Apr 3, 2020
.coveragerc Vineyard data source and sink. (#1000) Apr 3, 2020
.coveragerc-threaded Vineyard data source and sink. (#1000) Apr 3, 2020
.dockerignore Allow some operands to fail fast (#1229) May 21, 2020
.flake8 Use default args for super() (#878) Dec 25, 2019
.gitignore Add preliminary support for Yarn (#1210) May 20, 2020
CONTRIBUTING.rst Add contributing guide & remove tracing on tags (#181) Jan 25, 2019
LICENSE Add preliminary support for Yarn (#1210) May 20, 2020
MANIFEST.in Use MurmurHash instead of hashlib when calculating hash (#376) May 7, 2019
README.rst Add docs for remote API, getting started as well as GPU integration (#… Jun 4, 2020
conda-spec.txt Make sure all kwargs are numpy types when inferring dtypes (#987) Feb 14, 2020
requirements-dev.txt Allow pyarrow to use 0.17.0 (#1172) Apr 21, 2020
requirements-extra.txt [ENH] Allow client-side to use pickle to serialize / deserialize tens… Jun 11, 2020
requirements-wheel.txt Remove *_pb2.py to reduce chances of code conflict (#913) Jan 13, 2020
requirements.txt [ENH] Allow client-side to use pickle to serialize / deserialize tens… Jun 11, 2020
setup.py Add checks for data consistency in learn module (#1246) May 29, 2020

README.rst

https://raw.githubusercontent.com/mars-project/mars/master/docs/source/images/mars-logo-title.png

PyPI version Docs Build Coverage Quality License

Mars is a tensor-based unified framework for large-scale data computation which scales Numpy, Pandas and Scikit-learn.

Documentation, 中文文档

Installation

Mars is easy to install by

pip install pymars

When you need to install dependencies needed by the distributed version, you can use the command below.

pip install 'pymars[distributed]'

For now, distributed version is only available on Linux and Mac OS.

Developer Install

When you want to contribute code to Mars, you can follow the instructions below to install Mars for development:

git clone https://github.com/mars-project/mars.git
cd mars
pip install -e ".[dev]"

More details about installing Mars can be found at getting started section in Mars document.

Mars tensor

Mars tensor provides a familiar interface like Numpy.

Numpy Mars tensor
import numpy as np
N = 200_000_000
a = np.random.uniform(-1, 1, size=(N, 2))
print((np.linalg.norm(a, axis=1) < 1)
      .sum() * 4 / N)
import mars.tensor as mt
N = 200_000_000
a = mt.random.uniform(-1, 1, size=(N, 2))
print(((mt.linalg.norm(a, axis=1) < 1)
        .sum() * 4 / N).execute())
3.14151712
CPU times: user 12.5 s, sys: 7.16 s,
           total: 19.7 s
Wall time: 21.8 s
3.14161908
CPU times: user 17.5 s, sys: 3.56 s,
           total: 21.1 s
Wall time: 5.59 s

Mars can leverage multiple cores, even on a laptop, and could be even faster for a distributed setting.

Mars DataFrame

Mars DataFrame provides a familiar interface like pandas.

Pandas Mars DataFrame
import numpy as np
import pandas as pd
df = pd.DataFrame(
    np.random.rand(100000000, 4),
    columns=list('abcd'))
print(df.sum())
import mars.tensor as mt
import mars.dataframe as md
df = md.DataFrame(
    mt.random.rand(100000000, 4),
    columns=list('abcd'))
print(df.sum().execute())
CPU times: user 10.9 s, sys: 2.69 s,
           total: 13.6 s
Wall time: 11 s
CPU times: user 16.5 s, sys: 3.52 s,
           total: 20 s
Wall time: 3.6 s

Mars learn

Mars learn provides a familiar interface like scikit-learn.

Scikit-learn Mars learn
from sklearn.datasets import make_blobs
from sklearn.decomposition import PCA
X, y = make_blobs(
    n_samples=100000000, n_features=3,
    centers=[[3, 3, 3], [0, 0, 0],
             [1, 1, 1], [2, 2, 2]],
    cluster_std=[0.2, 0.1, 0.2, 0.2],
    random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_)
from mars.learn.datasets import make_blobs
from mars.learn.decomposition import PCA
X, y = make_blobs(
    n_samples=100000000, n_features=3,
    centers=[[3, 3, 3], [0, 0, 0],
              [1, 1, 1], [2, 2, 2]],
    cluster_std=[0.2, 0.1, 0.2, 0.2],
    random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_)

Mars remote

Mars remote allows users to execute functions in parallel.

Normal function calls Mars remote
import numpy as np


def calc_chunk(n, i):
    rs = np.random.RandomState(i)
    a = rs.uniform(-1, 1, size=(n, 2))
    d = np.linalg.norm(a, axis=1)
    return (d < 1).sum()

def calc_pi(fs, N):
    return sum(fs) * 4 / N

N = 200_000_000
n = 10_000_000

fs = [calc_chunk(n, i)
      for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)
import numpy as np
import mars.remote as mr

def calc_chunk(n, i):
    rs = np.random.RandomState(i)
    a = rs.uniform(-1, 1, size=(n, 2))
    d = np.linalg.norm(a, axis=1)
    return (d < 1).sum()

def calc_pi(fs, N):
    return sum(fs) * 4 / N

N = 200_000_000
n = 10_000_000

fs = [mr.spawn(calc_chunk, args=(n, i))
      for i in range(N // n)]
pi = mr.spawn(calc_pi, args=(fs, N))
print(pi.execute().fetch())
3.1416312
CPU times: user 32.2 s, sys: 4.86 s,
           total: 37.1 s
Wall time: 12.4 s
3.1416312
CPU times: user 16.9 s, sys: 5.46 s,
           total: 22.3 s
Wall time: 4.83 s

Eager Mode

Mars supports eager mode which makes it friendly for developing and easy to debug.

Users can enable the eager mode by options, set options at the beginning of the program or console session.

>>> from mars.config import options
>>> options.eager_mode = True

Or use a context.

>>> from mars.config import option_context
>>> with option_context() as options:
>>>     options.eager_mode = True
>>>     # the eager mode is on only for the with statement
>>>     ...

If eager mode is on, tensor, DataFrame etc will be executed immediately by default session once it is created.

>>> import mars.tensor as mt
>>> import mars.dataframe as md
>>> from mars.config import options
>>> options.eager_mode = True
>>> t = mt.arange(6).reshape((2, 3))
>>> t
array([[0, 1, 2],
       [3, 4, 5]])
>>> df = md.DataFrame(t)
>>> df.sum()
0    3
1    5
2    7
dtype: int64

Easy to scale in and scale out

Mars can scale in to a single machine, and scale out to a cluster with thousands of machines. Both the local and distributed version share the same piece of code, it's fairly simple to migrate from a single machine to a cluster due to the increase of data.

Running on a single machine including thread-based scheduling, local cluster scheduling which bundles the whole distributed components. Mars is also easy to scale out to a cluster by starting different components of mars distributed runtime on different machines in the cluster.

Threaded

execute method will by default run on the thread-based scheduler on a single machine.

>>> import mars.tensor as mt
>>> a = mt.ones((10, 10))
>>> a.execute()

Users can create a session explicitly.

>>> from mars.session import new_session
>>> session = new_session()
>>> (a * 2).execute(session=session)
>>> # session will be released when out of with statement
>>> with new_session() as session2:
>>>     (a / 3).execute(session=session2)

Local cluster

Users can start the local cluster bundled with the distributed runtime on a single machine. Local cluster mode requires mars distributed version.

>>> from mars.deploy.local import new_cluster

>>> # cluster will create a session and set it as default
>>> cluster = new_cluster()

>>> # run on the local cluster
>>> (a + 1).execute()

>>> # create a session explicitly by specifying the cluster's endpoint
>>> session = new_session(cluster.endpoint)
>>> (a * 3).execute(session=session)

Distributed

After installing the distributed version on every node in the cluster, A node can be selected as scheduler and another as web service, leaving other nodes as workers. The scheduler can be started with the following command:

mars-scheduler -a <scheduler_ip> -p <scheduler_port>

Web service can be started with the following command:

mars-web -a <web_ip> -s <scheduler_endpoint> --ui-port <ui_port_exposed_to_user>

Workers can be started with the following command:

mars-worker -a <worker_ip> -p <worker_port> -s <scheduler_endpoint>

After all mars processes are started, users can run

>>> sess = new_session('http://<web_ip>:<ui_port>')
>>> a = mt.ones((2000, 2000), chunk_size=200)
>>> b = mt.inner(a, a)
>>> b.execute(session=sess)

Getting involved

Thank you in advance for your contributions!

You can’t perform that action at this time.