我在 pyspark 数据框中有以下列,类型为 Array[Int]。
+--------------------+
| feature_indices|
+--------------------+
| [0]|
|[0, 1, 4, 10, 11,...|
| [0, 1, 2]|
| [1]|
| [0]|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
我试图用零填充数组,然后限制列表长度,以便每行数组的长度相同。例如,对于 n = 5,我期望:
+--------------------+
| feature_indices|
+--------------------+
| [0, 0, 0, 0, 0]|
| [0, 1, 4, 10, 11]|
| [0, 1, 2, 0, 0]|
| [1, 0, 0, 0, 0]|
| [0, 0, 0, 0, 0]|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
有什么建议么?我查看了 pysparkrpad函数,但它仅对字符串类型列进行操作。
我正在从 kafka 主题读取流数据,我想将其中的某些部分存储在 pandas 数据框中。
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': "###",
'group.id': '###',
'default.topic.config': {
'auto.offset.reset': 'latest' }
})
c.subscribe(['scorestore'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
Run Code Online (Sandbox Code Playgroud)
收到的消息是一个json
{
"messageHeader" : {
"messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
"timestamp" : 152520000,
"sourceHost" : "test",
"sourceLocation" : "test",
"tags" : [ ],
"version" : "1.0"
},
"id_value" : {
"id" : "1234",
"value" …Run Code Online (Sandbox Code Playgroud)