我有一个按组和日期时间组织的数据框以及相应的值列。您可以将价值 col 视为资产 A、B、C 等(组 col)的价值。我正在尝试计算每个日期每组的总收益/损失。在第一次进入之前,相应的值为 0。例如,下面在 12/31/2019 和 1/1/2020 0:00 之间,A 的值从 -43 到 19。因此,A 在两者之间的利润日期应为 19-(-43) = 62。
我可以通过以下方式实现这个结果:
df.groupby([time_index, group]).sum().groupby(group).diff().groupby(group).cumsum()
Run Code Online (Sandbox Code Playgroud)
但是,此代码不处理不再标记组或首次记录组之前的日期。例如,D 组的第一个条目是在 2020 年 1 月 3 日 0:00 -13 分。这意味着在2020年12月31日至2020年1月3日0:00之间,D组的损失为-13-0=-13(0是因为2020年1月3日0:00是第一个D条目)。
另外,假设某个组不再被记录(即已售出)——例如,2020年1月1日9:00之后的A组,该值应“前置”并取最后记录的A组值并填写到以后的日期。这样,A 在 1/3/2020 18:00 和 12/31/2019 之间的增益为 123 - (-43) = 166。
为了最大限度地提高上面的代码的工作效率,我希望对给定的数据帧进行回填(下图中的蓝色)和前填充(下图中的绿色)。组在首次列出之前应回填为 0。组应“前置”作为最后列出后的最后记录值。
以下是我想要实现的目标:
以下是示例数据帧和我试图通过的简单测试用例:
df.groupby([time_index, group]).sum().groupby(group).diff().groupby(group).cumsum()
Run Code Online (Sandbox Code Playgroud) 我想根据 A 列从 B 列中删除列表中的值,想知道如何。
鉴于:
df = pd.DataFrame({
'A': ['a1', 'a2', 'a3', 'a4'],
'B': [['a1', 'a2'], ['a1', 'a2', 'a3'], ['a1', 'a3'], []]
})
Run Code Online (Sandbox Code Playgroud)
我想要:
result = pd.DataFrame({
'A': ['a1', 'a2', 'a3', 'a4'],
'B': [['a1', 'a2'], ['a1', 'a2', 'a3'], ['a1', 'a3'], []],
'Output': [['a2'], ['a1', 'a3'], ['a1'], []]
})
Run Code Online (Sandbox Code Playgroud) 我在 mongo 集合上有(900k, 300)条记录。当我尝试将数据读取到 Pandas 时,内存消耗会急剧增加,直到进程被终止。1.5GB~如果我从 csv 文件中读取数据,我必须提到数据适合内存()。
我的机器是 32GB RAM 和 16 个 CPU 的 Centos 7。
我的简单代码:
client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))
Run Code Online (Sandbox Code Playgroud)
我的多处理代码:
def read_mongo_parallel(skipses):
print("Starting process")
client = MongoClient(skipses[4],skipses[5])
db = client[skipses[2]]
collection = db[skipses[3]]
print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))
cursor = collection.find().skip(skipses[0]).limit(skipses[1])
return list(cursor)
all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for rows in executor.map(read_mongo_parallel, skipesess):
all_lists.extend(rows)
df = pd.DataFrame(all_lists)
Run Code Online (Sandbox Code Playgroud)
两种方法的内存增加并杀死内核,
我在做什么?