使用熊猫的"大数据"工作流程

Zel*_*ny7 913 python hdf5 large-data mongodb pandas

在学习大熊猫的过程中,我试图解决这个问题的答案已有好几个月了.我使用SAS进行日常工作,这非常适合它的核心支持.然而,由于其他许多原因,SAS作为一款软件非常糟糕.

有一天,我希望用python和pandas替换我对SAS的使用,但我目前缺乏大型数据集的核心工作流程.我不是在谈论需要分布式网络的"大数据",而是说文件太大而无法容纳在内存中,但又足够小以适应硬盘驱动器.

我的第一个想法是用于HDFStore在磁盘上保存大型数据集,并仅将我需要的部分拉入数据帧进行分析.其他人提到MongoDB是一种更容易使用的替代品.我的问题是:

有哪些最佳实践工作流程可用于完成以下任务:

  1. 将平面文件加载到永久的磁盘数据库结构中
  2. 查询该数据库以检索数据以提供给pandas数据结构
  3. 在操作pandas中的片段后更新数据库

真实世界的例子将非常受欢迎,尤其是那些在"大数据"上使用熊猫的人.

编辑 - 我希望如何工作的示例:

  1. 迭代导入大型平面文件并将其存储在永久的磁盘数据库结构中.这些文件通常太大而无法放入内存中.
  2. 为了使用Pandas,我想读取这些数据的子集(通常一次只有几列),它们可以适合内存.
  3. 我将通过对所选列执行各种操作来创建新列.
  4. 然后我必须将这些新列附加到数据库结构中.

我正在尝试找到执行这些步骤的最佳实践方法.阅读关于pandas和pytables的链接似乎附加一个新列可能是个问题.

编辑 - 特别回应杰夫的问题:

  1. 我正在构建消费者信用风险模型.数据种类包括电话,SSN和地址特征; 财产价值; 犯罪记录,破产等贬损信息......我每天使用的数据集平均有近1,000到2,000个字段的混合数据类型:数字和字符数据的连续,名义和序数变量.我很少附加行,但我会执行许多创建新列的操作.
  2. 典型操作涉及使用条件逻辑将多个列组合到新的复合列中.例如,if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'.这些操作的结果是我的数据集中每条记录的新列.
  3. 最后,我想将这些新列附加到磁盘上的数据结构中.我将重复第2步,使用交叉表和描述性统计数据探索数据,试图找到有趣,直观的模型关系.
  4. 典型的项目文件通常约为1GB.文件被组织成一种行,其中一行包括消费者数据的记录.每行对每条记录都有相同的列数.情况总是如此.
  5. 在创建新列时,我很少会按行进行子集化.但是,在创建报告或生成描述性统计信息时,对行进行子集化非常常见.例如,我可能想为特定的业务线创建一个简单的频率,比如零售信用卡.要做到这一点,除了我要报告的列之外,我只会选择那些业务线=零售的记录.但是,在创建新列时,我会提取所有数据行,只提取操作所需的列.
  6. 建模过程要求我分析每一列,寻找与某些结果变量的有趣关系,并创建描述这些关系的新化合物列.我探索的列通常以小集合完成.例如,我将专注于一组20个列,只处理属性值并观察它们与贷款违约的关系.一旦探索了这些并创建了新的列,我就转到另一组列,比如大学教育,然后重复这个过程.我正在做的是创建候选变量来解释我的数据和某些结果之间的关系.在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个方程式.

我很少会在数据集中添加行.我几乎总是会创建新的列(统计/机器学习用语中的变量或特征).

Jef*_*eff 581

我通常以这种方式使用数十亿字节的数据,例如我在磁盘上有表格,我通过查询读取,创建数据并追加.

值得阅读文档,在本主题的后期提供有关如何存储数据的若干建议.

详细信息将影响您存储数据的方式,例如:
尽可能多地提供详细信息; 我可以帮你建立一个结构.

  1. 数据大小,行数,列数,列数; 你是追加行还是只是列?
  2. 典型的操作会是什么样子.例如,对列进行查询以选择一堆行和特定列,然后执行操作(内存中),创建新列,保存这些列.
    (给出一个玩具示例可以让我们提供更具体的建议.)
  3. 经过那个处理,那你做什么?第2步是临时的,还是可重复的?
  4. 输入平面文件:Gb中有多少粗略的总大小.这些如何通过记录组织起来?每个文件是否包含不同的字段,或者每个文件中是否包含每个文件中包含所有字段的记录?
  5. 您是否曾根据条件选择行(记录)的子集(例如,选择字段A> 5的行)?然后做一些事情,或者你只是选择包含所有记录的字段A,B,C(然后做一些事情)?
  6. 您是否"处理"所有列(成组),或者是否有一个很好的比例,您只能用于报告(例如,您希望保留数据,但不需要明确列出该列最终结果时间)?

确保至少0.10.1安装了大熊猫.

读取chunk-by-chunk多个表查询的迭代文件.

由于pytables被优化为按行进行操作(这是您查询的内容),因此我们将为每组字段创建一个表.通过这种方式,可以轻松选择一小组字段(可以使用大表格,但这样做效率更高......我想我将来可以修复这个限制...这是无论如何更直观):(
以下是伪代码.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
Run Code Online (Sandbox Code Playgroud)

读取文件并创建存储(基本上做什么append_to_multiple):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])
Run Code Online (Sandbox Code Playgroud)

现在你已经拥有了文件中的所有表格(实际上你可以将它们存储在单独的文件中,如果你愿意,你可能需要将文件名添加到group_map,但可能这不是必需的).

这是您获取列并创建新列的方法:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
Run Code Online (Sandbox Code Playgroud)

当您准备好进行post_processing时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
Run Code Online (Sandbox Code Playgroud)

关于data_columns,您实际上不需要定义任何 data_columns; 它们允许您根据列子选择行.例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])
Run Code Online (Sandbox Code Playgroud)

在最终报告生成阶段,它们可能对您最感兴趣(实质上,数据列与其他列隔离,如果您定义了很多,这可能会影响效率).

您可能还想:

  • 创建一个获取字段列表的函数,在groups_map中查找组,然后选择这些并连接结果,以便得到结果帧(这实际上是select_as_multiple所做的).通过这种方式,结构对您来说非常透明.
  • 某些数据列上的索引(使行子集更快).
  • 启用压缩.

如果您有疑问,请告诉我!

  • @Jeff,熊猫目前处于0.17.x,现在上述问题已在熊猫中解决了吗? (10认同)
  • 谢谢你的链接.第二个链接让我有点担心我不能在HDFStore的表中添加新列?那是对的吗?另外,我添加了一个如何使用此设置的示例. (4认同)
  • @Jeff热衷于为您的答案添加快速更新以促进dask? (4认同)
  • hdf中的实际结构取决于您.Pytables是面向行的,在创建时具有固定列.创建表后,您无法追加列.但是,您可以创建一个与现有表索引相同的新表.(请参阅文档中的select_as_multiple示例).这样,您可以创建任意大小的对象,同时具有非常高效的查询.您使用数据的方式是如何在磁盘上进行组织的关键.向我发送一个包含更具体示例的伪代码的列表外电子邮件. (3认同)

use*_*356 126

我认为上面的答案缺少一个我发现非常有用的简单方法.

当我的文件太大而无法加载到内存中时,我会将文件分解为多个较小的文件(按行或列)

示例:如果30天大小的交易数据为30天,我将其分成每天约1GB大小的文件.我随后分别处理每个文件并在最后汇总结果

其中一个最大的优点是它允许并行处理文件(多个线程或进程)

另一个优点是文件操作(如在示例中添加/删除日期)可以通过常规shell命令来完成,这在更高级/复杂的文件格式中是不可能的

这种方法并不涵盖所有场景,但在很多场景中非常有用

  • 同意.随着所有的炒作,很容易忘记[命令行工具比Hadoop集群快235倍](http://aadrake.com/command-line-tools-can-be-235x-faster-than-your -hadoop-cluster.html) (34认同)
  • 更新的链接:https://adamdrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html (2认同)

Pri*_*ate 69

现在,在问题发生两年后,一个"核心外"的熊猫相当于:dask.太棒了!虽然它不支持所有的熊猫功能,但你可以使用它.

  • 对于一个完整的dask实例,请看看http://stackoverflow.com/questions/37979167/how-to-parallelize-many-fuzzy-string-comparisons-using-apply-in-pandas (4认同)
  • 它总是“脱离核心”吗?(即不是 RAM 密集型?)。恕我直言,如果您手头没有集群,那么 Dask 并不是一个好的解决方案。引用 [Dask 文档本身](https://docs.dask.org/en/latest/spark.html):*“如果您希望管理 1 TB 或更少的表格 CSV 或 JSON 数据,那么您应该忘记 Spark 和 Dask,使用 Postgres 或 MongoDB。”* (4认同)

rju*_*ney 59

如果您的数据集在1到20GB之间,那么您应该得到一个具有48GB RAM的工作站.然后Pandas可以将整个数据集保存在RAM中.我知道这不是你在这里寻找的答案,但在4GB内存的笔记本电脑上进行科学计算是不合理的.

  • 结果表明性能与时间和地点有关;) (32认同)
  • "在具有4GB RAM的笔记本电脑上进行科学计算是不合理的"定义合理.我认为UNIVAC会采取不同的看法.http://arstechnica.com/tech-policy/2011/09/univac-the-troubled-life-of-americas-first-computer/ (5认同)
  • 在具有 48GB RAM 的工作站上进行科学计算是不合理的。 (4认同)
  • 同意!尝试继续在内存中工作,即使它预先花费了 $$。如果您的工作带来了财务回报,那么随着时间的推移,您将通过提高效率来弥补开支。 (3认同)
  • 配备61GB / RAM的r4.2xlarge每小时$ .532。您正在进行哪种科学计算,那不那么有价值?听起来很不正常,即使不是很不合理。 (3认同)
  • @rjurney对不起,也许我应该删除我的评论。您对“不合理的”科学计算机的判断似乎很主观。我多年来在笔记本电脑上进行科学计算,这对我来说似乎足够了,因为大部分时间我都在编写代码。从编程的角度来看,我的算法比从计算角度来看要困难得多。另外,我非常确定编写可伸缩算法不应依赖当前的硬件限制。您对别人的计算的评论听起来有些冒犯(除了主观性之外),您介意删除这几个词吗? (2认同)
  • 主观且无益的不回答问题。对此可能存在经济上的争论,这取决于执行次数 x cpu 时间成本是否超过升级成本。这种方法只能通过硬件限制而不是数据大小来扩展。另外:在研究人员的笔记本电脑上已经取得了如此多的数据科学领域的辉煌成就!此外,没有钱投入大型工作站的学者、学生和初创公司呢?! (2认同)
  • @thclark您可以根据需要进行优化,然后再进行优化,但是使用大型计算机最容易完成大型数据集的科学计算……只要它适合RAM。如果不是,您需要一个分布式系统,但是他没有这个问题,因此我没有给出答案。至于钱,您不需要将钱倒入昂贵的工作站中。需要时,一次只租一个小时。您将为此感到更快乐,更有生产力! (2认同)

chi*_*aku 51

我知道这是一个旧线程,但我认为Blaze库值得一试.它是为这些类型的情况而构建的.

来自文档:

Blaze将NumPy和Pandas的可用性扩展到分布式和核心外计算.Blaze提供类似于NumPy ND-Array或Pandas DataFrame的界面,但将这些熟悉的界面映射到Postgres或Spark等各种其他计算引擎上.

编辑:顺便说一下,它由ContinuumIO和NumPy的作者Travis Oliphant支持.


bri*_*ler 46

这是pymongo的情况.我还在python中使用sql server,sqlite,HDF,ORM(SQLAlchemy)进行原型设计.首先,pymongo是基于文档的DB,因此每个人都是(dict属性的)文档.许多人组成一个集合,你可以有很多集合(人,股市,收入).

pd.dateframe - > pymongo注意:我使用chunksizein read_csv来保持5到10k记录(如果更大,pymongo会丢弃套接字)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))
Run Code Online (Sandbox Code Playgroud)

查询:gt =大于...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
Run Code Online (Sandbox Code Playgroud)

.find()返回一个迭代器,所以我通常ichunked用来切入更小的迭代器.

加入怎么样,因为我通常会将10个数据源粘贴在一起:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
Run Code Online (Sandbox Code Playgroud)

然后(在我的情况下,有时我必须aJoinDF首先在其"合并"之前进行聚合.)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')
Run Code Online (Sandbox Code Playgroud)

然后,您可以通过下面的更新方法将新信息写入主集合.(逻辑集合与物理数据源).

collection.update({primarykey:foo},{key:change})
Run Code Online (Sandbox Code Playgroud)

在较小的查找上,只是非规范化.例如,您在文档中有代码,只需添加字段代码文本并在dict创建文档时进行查找.

现在你有一个基于一个人的好数据集,你可以在每个案例中释放你的逻辑并创建更多属性.最后,你可以阅读大熊猫你的3到内存最大关键指标,并进行枢轴/聚合/数据探索.这对我有300万条记录的数字/大文/类别/代码/花车/ ...

您还可以使用MongoDB中内置的两种方法(MapReduce和聚合框架).有关聚合框架的更多信息,请参见此处,因为它似乎比MapReduce更容易,并且看起来很方便快速聚合工作.注意我不需要定义我的字段或关系,我可以添加项目到文档.在快速变化的numpy,pandas,python工具集的当前状态下,MongoDB帮助我开始工作:)

  • 是的,我为一个简单的范围DF做了同样的事情,numpy的int64似乎打扰了pymongo.我玩过的所有数据都来自CSV(与人工通过range()相比)并且类型很长,因此没有问题.在numpy你可以转换,但我确实看到这是有害的.我必须承认HDF的10.1项看起来令人兴奋. (2认同)

Joh*_*man 37

我发现这有点晚了,但我处理类似的问题(按揭预付款模式).我的解决方案是跳过pandas HDFStore层并使用直接pytables.我将每列保存为最终文件中的单个HDF5阵列.

我的基本工作流程是首先从数据库中获取CSV文件.我gzip它,所以它不是那么大.然后我将其转换为面向行的HDF5文件,通过在python中迭代它,将每一行转换为实际数据类型,并将其写入HDF5文件.这需要几十分钟,但它不使用任何内存,因为它只是逐行操作.然后我将面向行的HDF5文件"转置"为面向列的HDF5文件.

表转置如下:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()
Run Code Online (Sandbox Code Playgroud)

读回来然后看起来像:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)
Run Code Online (Sandbox Code Playgroud)

现在,我通常在拥有大量内存的机器上运行它,所以我可能对内存使用情况不够谨慎.例如,默认情况下,加载操作会读取整个数据集.

这通常适合我,但它有点笨重,我不能使用花哨的pytables魔术.

编辑:这种方法的真正优势在于记录数组的pytables默认值,然后我可以使用h5r将数据加载到R中,而h5r无法处理表.或者,至少,我无法让它加载异构表.


yts*_*aig 29

我发现有助于大数据用例的一个技巧是通过将浮点精度降低到32位来减少数据量.它并不适用于所有情况,但在许多应用中,64位精度是过度的,节省2倍的内存是值得的.使一个明显的观点更加明显:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
Run Code Online (Sandbox Code Playgroud)


wp7*_*8de 18

正如其他人所指出的那样,经过几年的出现,出现了"核心外"的大熊猫等同物:dask.虽然dask不是大熊猫及其所有功能的直接替代品,但它有以下几个原因:

Dask是一个灵活的分析计算并行计算库,针对"大数据"集合的交互式计算工作负载的动态任务调度进行了优化,如并行数组,数据帧和列表,将NumPy,Pandas或Python迭代器等常用接口扩展为更大 - 超过内存或分布式环境,并从笔记本电脑扩展到集群.

Dask强调以下优点:

  • 熟悉:提供并行化的NumPy数组和Pandas DataFrame对象
  • 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度界面.
  • Native:通过访问PyData堆栈,在Pure Python中启用分布式计算.
  • 快速:以低开销,低延迟和快速数值算法所需的最小序列化运行
  • 向上扩展:在具有1000个内核的集群上弹性运行向下扩展:在单个进程中设置并在笔记本电脑上运行的简单操作
  • 响应:设计时考虑到交互式计算,它提供快速反馈和诊断以帮助人类

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
Run Code Online (Sandbox Code Playgroud)

替换一些像这样的pandas代码:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
Run Code Online (Sandbox Code Playgroud)

并且,特别值得注意的是,通过concurrent.futures接口提供了一个用于提交自定义任务的通用:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()
Run Code Online (Sandbox Code Playgroud)


Oph*_*tan 13

还有一个变种

在pandas中完成的许多操作也可以作为db查询完成(sql,mongo)

使用RDBMS或mongodb可以在DB Query中执行一些聚合(针对大数据进行优化,并有效地使用缓存和索引)

之后,您可以使用pandas执行后期处理.

这种方法的优点是你可以获得数据库优化来处理大数据,同时仍然用高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么和做什么的细节核心.

虽然查询语言和pandas不同,但将部分逻辑从一个转换为另一个通常并不复杂.


Rob*_*Rob 13

我想指出 Vaex 包。

Vaex 是一个用于惰性 Out-of-Core DataFrames(类似于 Pandas)的 Python 库,用于可视化和探索大型表格数据集。它可以在每秒高达十亿 (10 9 ) 个对象/行的 N 维网格上计算统计数据,例如平均值、总和、计数、标准偏差等。可视化是使用直方图、密度图和 3D 体积渲染完成的,允许对大数据进行交互式探索。Vaex 使用内存映射、零内存复制策略和延迟计算来获得最佳性能(不浪费内存)。

看一下文档:https : //vaex.readthedocs.io/en/latest/ API 非常接近 Pandas 的 API。


lev*_*lev 10

这里值得一提的是Ray,
它是一个分布式计算框架,它以分布式方式为pandas提供了自己的实现.

只需替换pandas导入,代码应该按原样运行:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual
Run Code Online (Sandbox Code Playgroud)

可以在这里阅读更多细节:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


Gol*_*key 8

如果您选择创建数据管道的简单路径,可以考虑使用Ruffus,该管道可以分解为多个较小的文件.


小智 6

我最近遇到了类似的问题.我发现只是以块的形式读取数据并附加它,因为我以块的形式将它写入相同的csv.我的问题是根据另一个表中的信息添加日期列,使用某些列的值,如下所示.这可能会帮助那些被dask和hdf5困惑的人,但更熟悉像我这样的熊猫.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)
Run Code Online (Sandbox Code Playgroud)