我有一个非常大的pandas数据帧,我想创建一个列,其中包含自ISO-8601格式日期字符串的纪元以来的秒数.
我最初使用标准的Python库,但结果很慢.我曾尝试使用POSIX的C库函数来代替这个strptime和mktime直接,但一直没能得到的时间转换正确的答案.
这是代码(在IPython窗口中运行)
%load_ext cythonmagic
%%cython
from posix.types cimport time_t
cimport numpy as np
import numpy as np
import time
cdef extern from "sys/time.h" nogil:
struct tm:
int tm_sec
int tm_min
int tm_hour
int tm_mday
int tm_mon
int tm_year
int tm_wday
int tm_yday
int tm_isdst
time_t mktime(tm *timeptr)
char *strptime(const char *s, const char *format, tm *tm)
cdef to_epoch_c(const char *date_text):
cdef tm time_val
strptime(date_text, "%Y-%m-%d", &time_val)
return <unsigned int>mktime(&time_val)
cdef to_epoch_py(const char *date_text):
return np.uint32(time.mktime(time.strptime(date_text, …Run Code Online (Sandbox Code Playgroud) 我正在使用mesos/kafka库运行一个由6个代理组成的Kafka集群.我能够在6个不同的机器上添加和启动代理,并使用Python SimpleProducer和kafka-console-producer.sh脚本将消息发布到集群中.
但是我无法使消费者正常工作.我正在运行以下使用者命令:
bin/kafka-console-consumer.sh --zookeeper 192.168.1.199:2181 --topic test --from-beginning --consumer.config config/consumer.properties --delete-consumer-offsets
Run Code Online (Sandbox Code Playgroud)
在consumer.properties文件中,我将group.id设置为my.group并设置zookeeeper.connect为zookeeper集合中的多个节点.我从运行此消费者获得以下warninng消息:
[2015-09-24 16:01:06,609] WARN [my.group_my_host-1443106865779-b5a3a1e1-leader-finder-thread], Failed to add l
eader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherM
anager$LeaderFinderThread)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{'some':2}
[2015-09-24 …Run Code Online (Sandbox Code Playgroud)