小编dpo*_*man的帖子

Pyspark:用零填充数组 [Int] 列

我在 pyspark 数据框中有以下列,类型为 Array[Int]。

+--------------------+
|     feature_indices|
+--------------------+
|                 [0]|
|[0, 1, 4, 10, 11,...|
|           [0, 1, 2]|
|                 [1]|
|                 [0]|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

我试图用零填充数组,然后限制列表长度,以便每行数组的长度相同。例如,对于 n = 5,我期望:

+--------------------+
|     feature_indices|
+--------------------+
|     [0, 0, 0, 0, 0]|
|   [0, 1, 4, 10, 11]|
|     [0, 1, 2, 0, 0]|
|     [1, 0, 0, 0, 0]|
|     [0, 0, 0, 0, 0]|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

有什么建议么?我查看了 pysparkrpad函数,但它仅对字符串类型列进行操作。

python dataframe pyspark

6
推荐指数
1
解决办法
3007
查看次数

没有 Spark 的 Kafka 到 Pandas 数据框

我正在从 kafka 主题读取流数据,我想将其中的某些部分存储在 pandas 数据框中。

from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': "###",
    'group.id': '###',
    'default.topic.config': {
'auto.offset.reset': 'latest' }
})

c.subscribe(['scorestore'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()
Run Code Online (Sandbox Code Playgroud)

收到的消息是一个json

{
  "messageHeader" : {
    "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
    "timestamp" : 152520000,
    "sourceHost" : "test",
    "sourceLocation" : "test",
    "tags" : [ ],
    "version" : "1.0"
  },
  "id_value" : {
    "id" : "1234",
    "value" …
Run Code Online (Sandbox Code Playgroud)

python json pandas apache-kafka

5
推荐指数
1
解决办法
7826
查看次数

标签 统计

python ×2

apache-kafka ×1

dataframe ×1

json ×1

pandas ×1

pyspark ×1