我在R中使用空间数据进行商业应用,并希望使用ggplot2进行数据可视化.如果您在https://github.com/hadley/ggplot2/wiki/plotting-polygon-shapefiles上运行Hadley的示例,您会发现为了运行该fortify命令,您需要启用gpclib工具的使用gpclibPermit().
我正在寻找一种有效的方法(不涉及手动入侵S4对象)来执行与此相同的操作fortify,即获取空间多边形对象并将其转换为常规数据框,其中行条目包含纬度和纵坐标和多边形id.
还有其他人解决了这个吗?
我有一个超级简单的测试DAG,看起来像这样:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='scheduler_test_dag',
start_date=datetime(2017, 9, 9, 4, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
schedule_interval='@once' #externally triggered
)
def ticker_function():
with open('/tmp/ticker', 'a') as outfile:
outfile.write('{}\n'.format(datetime.now()))
time_ticker = PythonOperator(
task_id='time_ticker',
python_callable=ticker_function,
dag=DAG
)
Run Code Online (Sandbox Code Playgroud)
自升级到apache-airflowv1.9 以来,此DAG已挂起且无法运行.深入研究调度程序日志,我发现了错误跟踪:
[2018-02-12 17:03:06,259] {jobs.py:1754} INFO - DAG(s) dict_keys(['scheduler_test_dag']) retrieved from /home/ubuntu/airflow/dags/scheduler_test_dag.py
[2018-02-12 17:03:06,315] {jobs.py:1386} INFO - Processing scheduler_test_dag
[2018-02-12 17:03:06,320] {jobs.py:379} ERROR - Got an exception! …Run Code Online (Sandbox Code Playgroud) 我正在使用该LocalScheduler选项在EC2实例上使用气流.我已经调用了airflow scheduler,airflow webserver而且一切似乎都运行正常.也就是说,在将cron字符串提供schedule_interval给"每10分钟'*/10 * * * *'执行一次"之后,默认情况下,作业每24小时继续执行一次.这是代码的标题:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import workers
else:
print('Define PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
'max_active_runs': 1,
'concurrency': 4,
'schedule_interval': '*/10 * * …Run Code Online (Sandbox Code Playgroud) 所有,
我刚刚开始玩朱莉娅语,我很享受它.在第3个教程结束时,有一个有趣的问题:对二次公式进行泛化,使其解决任何n阶多项式方程的根.
这让我感到震惊,因为(a)一个有趣的编程问题和(b)一个有趣的Julia问题.谁有人解决了这个?作为参考,这里是带有几个玩具示例的Julia代码.同样,这个想法是为任何n阶多项式制作这个通用的.
干杯,
亚伦
function derivative(f)
return function(x)
# pick a small value for h
h = x == 0 ? sqrt(eps(Float64)) : sqrt(eps(Float64)) * x
# floating point arithmetic gymnastics
xph = x + h
dx = xph - x
# evaluate f at x + h
f1 = f(xph)
# evaluate f at x
f0 = f(x)
# divide the difference by h
return (f1 - f0) / dx
end
end
function quadratic(f)
f1 = …Run Code Online (Sandbox Code Playgroud) 请考虑以下DAG示例,其中第一个任务get_id_creds是从数据库中提取凭据列表.此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些ID写入文件/tmp/ids.txt.然后,我将这些ID扫描到我的DAG中,并使用它们生成upload_transaction可以并行运行的任务列表.
我的问题是:使用气流是否有更正确,更动态的方式来做到这一点?我在这里感觉笨拙和脆弱.如何直接将有效ID列表从一个进程传递到定义后续下游进程?
from datetime import datetime, timedelta
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'schedule_interval': None
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in uids:
upload_transactions = PythonOperator( …Run Code Online (Sandbox Code Playgroud) 这个顺序:
from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)
产生以下内容:
UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)
Run Code Online (Sandbox Code Playgroud)
这个序列效果很好:
from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)
关键在于明确声明charset. 我试图在气流中做到这一点,如下所示{"charset": "utf8"}:
但这并没有修复错误。自从进行更改后,我已经重新启动了我的开发环境,并且管理面板让我知道编辑成功。如何将 Airflow 连接到我的字符集作为 utf8?
所有,
我正在寻找一种可靠的,无监督的方法来检测相对较短的向量中的变化点。考虑以下两个示例:
v1 = c(0.299584,0.314446,0.357783,0.388896,0.410417,0.427182,0.450383,0.466671,0.474884,0.474749,0.493566,0.500374,0.522482,0.529851,0.538387,0.577901,0.610939,0.639383,0.662433,0.692656,0.720543,0.738255,0.748055,0.7591,0.770595,0.781811,0.794479,0.794588,0.789448,0.77667,0.765406,0.75152,0.740408,0.726898,0.720766,0.709445,0.69896,0.687508,0.673382,0.65795,0.639214,0.620445,0.590047,0.561773,0.526807,0.486848,0.439681,0.387545,0.313369,0.282872,0.279908,0.271836,0.269088,0.262727,0.259782)
v2 = c(0.081309,0.206263,0.429069,0.511859,0.565194,0.578792,0.56919,0.51985,0.432563,0.193907,0.0771,0.086603,0.18303,0.177608,0.169706,0.260917,0.292062,0.2979,0.263249,0.270576,0.250422,0.25219,0.182878,0.080623,0.079443,0.088944,0.087623,0.126403,0.155563,0.273942,0.312054,0.370195,0.357087,0.336452,0.300574,0.243105,0.243105,0.25593,0.227401,0.218047,0.15857,0.157727,0.139801,0.125742,0.129142,0.142166,0.142166,0.136748,0.107755,0.064377,0.072801,0.060093,0.103441,0.111704,0.124544)
Run Code Online (Sandbox Code Playgroud)
如果你看
plot(v1,type='l')
Run Code Online (Sandbox Code Playgroud)
和
plot(v2,type='l')
Run Code Online (Sandbox Code Playgroud)
您可以看到,对于v1,我想检测索引= 28附近的变化,对于v2,我想检测索引值为8、11、18、25、32和51的变化。到目前为止,我进行了贝叶斯变化点算法的实验,该算法在确定拐点可能出现的位置(低后验概率区域)方面行之有效,但仍然迫使我依靠视觉检查来进行最终确定:
install.packages('bcp')
library(bcp)
test = bcp(v1,w0=0.2,p0=0.01)
plot(v1,type='l')
par(new=TRUE)
plot(test$posterior.prob,type='l',col=2)
test = bcp(v2,w0=0.2,p0=0.01)
plot(v2,type='l')
par(new=TRUE)
plot(test$posterior.prob,type='l',col=2)
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以自动在这种数据中无监督地选择多个变化点的估计值?也许我只是徒劳地寻找人类直觉的替代品:PI还研究了changepoint软件包,但它似乎不是为此类数据而设计的。
谢谢亚伦
我有以下DAG,它使用专用于数据预处理例程的类来执行不同的方法:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
from table_builder import OnlineOfflinePreprocess
else:
print('Define MARKETING_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'max_active_runs': 1,
'concurrency': 4
}
worker = OnlineOfflinePreprocess()
DAG = DAG(
dag_id='marketing_data_preproc',
default_args=default_args,
start_date=datetime.today()
)
import_online_data = PythonOperator(
task_id='import_online_data',
python_callable=worker.import_online_data,
dag=DAG)
import_offline_data = PythonOperator(
task_id='import_offline_data',
python_callable=worker.import_offline_data,
dag=DAG)
merge_aurum_to_sherlock = PythonOperator(
task_id='merge_aurum_to_sherlock',
python_callable=worker.merge_aurum_to_sherlock,
dag=DAG)
merge_sherlock_to_aurum = PythonOperator(
task_id='merge_sherlock_to_aurum', …Run Code Online (Sandbox Code Playgroud) 我有以下模型:
from enum import Enum
from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType
from sqlalchemy import (
Column,
Integer,
Float,
Enum as SQAEnum,
String
)
from backend.db import Base
from backend.config import Config
class AuthProviders(Enum):
google = "Google"
facebook = "Facebook"
vest = "Vest"
class Users(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True, index=True)
hashed_password = Column(StringEncryptedType(String(255), Config.MYSQL_ENCRYPT_KEY))
auth_provider = Column(SQAEnum(AuthProviders))
timestamp = Column(Float(precision=32))
Run Code Online (Sandbox Code Playgroud)
alembic 使用哪个生成以下迁移脚本:
from alembic import op
import sqlalchemy as sa
from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType
from backend.config …Run Code Online (Sandbox Code Playgroud) 我正在运行这个 DAG。它从(尚未包括在内——这会有帮助吗?)导入dash_workers.py函数,并将这些函数实现为PythonOperator. 我正在使用气流版本 1.8.0:
from datetime import datetime, timedelta
import os
import sys
import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 7, 18),
'schedule_interval': None
}
DAG = af_models.DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in ids: …Run Code Online (Sandbox Code Playgroud) python ×7
airflow ×6
mysql ×2
python-3.x ×2
r ×2
alembic ×1
encryption ×1
ggplot2 ×1
gis ×1
julia ×1
r-maptools ×1
s4 ×1
sqlalchemy ×1