當前位置:首頁 » 編程語言 » python計劃任務

python計劃任務

發布時間: 2023-01-12 12:13:19

① 如何在Windows系統中設置python程序定時運行

首先,你得安裝python的運行環境唄。。 1. 默認的程序要設置,如果不會設置,就重新裝一次python就行 2. 需要設置環境變數:例如你的程序為:C:\Python26,則你需要把這個地址粘貼在:右鍵我的電腦->屬性->高級->環境變數,然後再下邊的 系統變數 選擇框中,雙擊path(大小寫無所謂),在變數值的框中,添加: ;C:\Python26,記得在路徑之前添加分號(;),當然是英文輸入法下的分號 3. 這一點很重要,因為執行的速度很快的時候,cmd窗口是不會自動pause的,所以會一閃而過。。如果只是print之類的語句,或許根本沒有什麼結果留下。。所以,這種還是選擇用IDLE來運行吧。。。

② 如何在Windows系統中設置Python程序定時運行

樓主最近寫了一個簡單的Python程序,想讓這個小程序每天上午10點定時運行一次,因為我的電腦是Windows系統,無法使用linux下強大的crontab命令,所以我只好求助於度娘。我搜到了好幾個相關的網頁,但是看完之後仍然不明白應該怎麼做,最後經過多次推理和嘗試才設置成功。我相信以後還會有人想要知道在Windows下怎麼設置Python程序定時運行,因此把我的經驗寫在這里,希望能幫助到大家。
為了照顧更多的朋友,下面我將每一步都截圖並加以簡單說明,如果覺得太啰嗦可以直接翻到後面看最關鍵的一步如何設置。
首先,我們肯定是要用到Windows下的【計劃任務】功能(註:樓主的系統是Windows8.1,但是Windows7或Windows10情況應該差不多),如下圖所示:
之後點擊右側的【創建基本任務】,輸入任務名稱以及可選的任務描述:
點擊下一步,設置任務的開始時間,這個應該沒什麼難度,我這里設置為每天早上10點運行此計劃任務:
點擊下一步,設置【操作】為【啟動程序】,再點擊下一步,最關鍵的地方來了,這里該怎麼填寫才能保證系統正確地運行Python程序呢?
廢話不多說,先看具體的設置:
這里解釋一下三個文本框內容的含義,【程序或腳本】文本框中填的是Python編譯器的名稱,一般就是python.exe,【起始於】文本框中填的是Python編譯器的目錄,上圖中假設你的Python編譯器的完整路徑是「C:\Python27\python.exe」,【添加參數】文本框中填的是你的Python程序的完整路徑,這里假設在C盤的Users文件夾下面有一個叫做code.py的文件。如果你的Python程序包含命令行參數,將其添加到Python程序的完整路徑之後即可。
相信聰明的讀者已經發現了,如果將這三部分連在一起,就是「C:\Python27\python.exe
C:\Users\code.py」,這其實就是在Windows命令行下輸入「python
C:\Users\code.py」(或「python
code.py」,如果你正好在C:\Users目錄下),只是在計劃任務的設置中需要給出完整的python編譯器的路徑而已。
之後點擊下一步就可以點擊完成了,是不是很簡單?

③ 如何使用python調用命令創建任務計劃

import os
file_path="C:\\Users\\zc\\Desktop\\python\\start.bat"
cmd='schtasks /create /tn "compress_upload" /tr %s /sc daily /st 16:00:00'%file_path
os.popen(cmd)

④ python 運維常用腳本

Python 批量遍歷目錄文件,並修改訪問時間

import os

path = "D:/UASM64/include/"
dirs = os.listdir(path)
temp=[];

for file in dirs:
temp.append(os.path.join(path, file))
for x in temp:
os.utime(x, (1577808000, 1577808000))
Python 實現的自動化伺服器管理

import sys
import os
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def ssh_cmd(user,passwd,port,userfile,cmd):

def ssh_put(user,passwd,source,target):

while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("輸入一個計劃任務: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("輸入要刪除的計劃任務: ")
temp1="crontab -l | grep -v " "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)

遍歷目錄和文件

import os

def list_all_files(rootdir):
import os
_files = []
list = os.listdir(rootdir) #列出文件夾下所有的目錄與文件
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files

a=list_all_files("C:/Users/LyShark/Desktop/a")
print(a)
python檢測指定埠狀態

import socket

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)

for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open "%ip)
except Exception:
print("192.168.1.%d server not open"%ip)

sk.close()

python實現批量執行CMD命令

import sys
import os
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

print("------------------------------> ")
print("使用說明,在當前目錄創建ip.txt寫入ip地址")
print("------------------------------> ")

user=input("輸入用戶名:")
passwd=input("輸入密碼:")
port=input("輸入埠:")
cmd=input("輸入執行的命令:")

file = open("./ip.txt", "r")
line = file.readlines()

for i in range(len(line)):
print("對IP: %s 執行"%line[i].strip(' '))

python3-實現釘釘報警

import requests
import sys
import json

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='

data = {"msgtype": "markdown","markdown": {"title": "監控","text": "apche異常"}}

headers = {'Content-Type':'application/json;charset=UTF-8'}

send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)

import psutil
import requests
import time
import os
import json

monitor_name = set(['httpd','cobblerd']) # 用戶指定監控的服務進程名稱

proc_dict = {}
proc_name = set() # 系統檢測的進程名稱
monitor_map = {
'httpd': 'systemctl restart httpd',
'cobblerd': 'systemctl restart cobblerd' # 系統在進程down掉後,自動重啟
}

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='

while True:
for proc in psutil.process_iter(attrs=['pid','name']):
proc_dict[proc.info['pid']] = proc.info['name']
proc_name.add(proc.info['name'])

判斷指定埠是否開放

import socket

port_number = [135,443,80]

for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()

判斷指定埠並且實現釘釘輪詢報警

import requests
import sys
import json
import socket
import time

def dingding(title,text):
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)

def net_scan():
port_number = [80,135,443]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
return index
sock.close()

while True:
dingding("Warning",net_scan())
time.sleep(60)

python-實現SSH批量CMD執行命令

import sys
import os
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def ssh_cmd(user,passwd,port,userfile,cmd):
file = open(userfile, "r")
line = file.readlines()
for i in range(len(line)):
print("對IP: %s 執行"%line[i].strip(' '))
ssh.connect(hostname=line[i].strip(' '),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()

ssh_cmd("lyshark","123","22","./ip.txt","free -h |grep 'Mem:' |awk '{print $3}'")

用python寫一個列舉當前目錄以及所有子目錄下的文件,並列印出絕對路徑

import sys
import os

for root,dirs,files in os.walk("C://"):
for name in files:
print(os.path.join(root,name))
os.walk()

按照這樣的日期格式(xxxx-xx-xx)每日生成一個文件,例如今天生成的文件為2013-09-23.log, 並且把磁碟的使用情況寫到到這個文件中。

import os
import sys
import time

new_time = time.strftime("%Y-%m-%d")
disk_status = os.popen("df -h").readlines()

str1 = ''.join(disk_status)
f = open(new_time+'.log','w')
f.write("%s"%str1)

f.flush()
f.close()

統計出每個IP的訪問量有多少?(從日誌文件中查找)

import sys

list = []

f = open("/var/log/httpd/access_log","r")
str1 = f.readlines()
f.close()

for i in str1:
ip=i.split()[0]
list.append(ip)

list_num=set(list)

for j in list_num:
num=list.count(j)
print("%s -----> %s" %(num,j))

寫個程序,接受用戶輸入數字,並進行校驗,非數字給出錯誤提示,然後重新等待用戶輸入。

import tab
import sys

while True:
try:
num=int(input("輸入數字:").strip())
for x in range(2,num+1):
for y in range(2,x):
if x % y == 0:
break
else:
print(x)
except ValueError:
print("您輸入的不是數字")
except KeyboardInterrupt:
sys.exit(" ")

ps 可以查看進程的內存佔用大小,寫一個腳本計算一下所有進程所佔用內存大小的和。

import sys
import os

list=[]
sum=0

str1=os.popen("ps aux","r").readlines()

for i in str1:
str2=i.split()
new_rss=str2[5]
list.append(new_rss)
for i in list[1:-1]:
num=int(i)
sum=sum+num

print("%s ---> %s"%(list[0],sum))

關於Python 命令行參數argv

import sys

if len(sys.argv) < 2:
print ("沒有輸入任何參數")
sys.exit()

if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]

利用random生成6位數字加字母隨機驗證碼

import sys
import random

rand=[]

for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)

自動化-使用pexpect非交互登陸系統

import pexpect
import sys

ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.txt', 'w')
ssh.logfile = fout

ssh.expect("[email protected]'s password:")

ssh.sendline("密碼")
ssh.expect('#')

ssh.sendline('ls /home')
ssh.expect('#')

Python-取系統時間

import sys
import time

time_str = time.strftime("日期:%Y-%m-%d",time.localtime())
print(time_str)

time_str= time.strftime("時間:%H:%M",time.localtime())
print(time_str)

psutil-獲取內存使用情況

import sys
import os
import psutil

memory_convent = 1024 * 1024
mem =psutil.virtual_memory()

print("內存容量為:"+str(mem.total/(memory_convent))+"MB ")
print("已使用內存:"+str(mem.used/(memory_convent))+"MB ")
print("可用內存:"+str(mem.total/(memory_convent)-mem.used/(1024*1024))+"MB ")
print("buffer容量:"+str(mem.buffers/( memory_convent ))+"MB ")
print("cache容量:"+str(mem.cached/(memory_convent))+"MB ")

Python-通過SNMP協議監控CPU
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import os

def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split(' ')[:-1]
return sn1

def getDate(host):
items = getAllitems(host, '.1.3.6.1.4.1.2021.11')

if name == ' main ':

Python-通過SNMP協議監控系統負載
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import os
import sys

def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split(' ')
return sn1

def getload(host,loid):
load_oids = Ƈ.3.6.1.4.1.2021.10.1.3.' + str(loid)
return getAllitems(host,load_oids)[0].split(':')[3]

if name == ' main ':

Python-通過SNMP協議監控內存
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import os

def getAllitems(host, oid):

def getSwapTotal(host):

def getSwapUsed(host):

def getMemTotal(host):

def getMemUsed(host):

if name == ' main ':

Python-通過SNMP協議監控磁碟
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import re
import os

def getAllitems(host,oid):

def getDate(source,newitem):

def getRealDate(item1,item2,listname):

def caculateDiskUsedRate(host):

if name == ' main ':

Python-通過SNMP協議監控網卡流量
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import re
import os

def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split(' ')[:-1]
return sn1

def getDevices(host):
device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')
device_list = []

def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
date = []

if name == ' main ':

Python-實現多級菜單

import os
import sys

ps="[None]->"
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
flage=1

while True:
ps="[None]->"
temp=input(ps)
if (temp=="test"):
print("test page !!!!")
elif(temp=="user"):
while (flage == 1):
ps="[User]->"
temp1=input(ps)
if(temp1 =="exit"):
flage=0
break
elif(temp1=="show"):
for i in range(len(ip)):
print(i)

Python實現一個沒用的東西

import sys

ps="[root@localhost]# "
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]

while True:
temp=input(ps)
temp1=temp.split()

檢查各個進程讀寫的磁碟IO

import sys
import os
import time
import signal
import re

class DiskIO:
def init (self, pname=None, pid=None, reads=0, writes=0):
self.pname = pname
self.pid = pid
self.reads = 0
self.writes = 0

def main():
argc = len(sys.argv)
if argc != 1:
print ("usage: please run this script like [./lyshark.py]")
sys.exit(0)
if os.getuid() != 0:
print ("Error: This script must be run as root")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system('echo 1 > /proc/sys/vm/block_mp')
print ("TASK PID READ WRITE")
while True:
os.system('dmesg -c > /tmp/diskio.log')
l = []
f = open('/tmp/diskio.log', 'r')
line = f.readline()
while line:
m = re.match(
'^(S+)(d+)(d+): (READ|WRITE) block (d+) on (S+)', line)
if m != None:
if not l:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
continue
found = False
for item in l:
if item.pid == m.group(2):
found = True
if m.group(3) == "READ":
item.reads = item.reads + 1
elif m.group(3) == "WRITE":
item.writes = item.writes + 1
if not found:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
time.sleep(1)
for item in l:
print ("%-10s %10s %10d %10d" %
(item.pname, item.pid, item.reads, item.writes))
def signal_handler(signal, frame):
os.system('echo 0 > /proc/sys/vm/block_mp')
sys.exit(0)

if name ==" main ":
main()

利用Pexpect實現自動非交互登陸linux

import pexpect
import sys

ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.log', 'w')
ssh.logfile = fout

ssh.expect("[email protected]'s password:")

ssh.sendline("密碼")

ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')

利用psutil模塊獲取系統的各種統計信息

import sys
import psutil
import time
import os

time_str = time.strftime( "%Y-%m-%d", time.localtime( ) )
file_name = "./" + time_str + ".log"

if os.path.exists ( file_name ) == False :
os.mknod( file_name )
handle = open ( file_name , "w" )
else :
handle = open ( file_name , "a" )

if len( sys.argv ) == 1 :
print_type = 1
else :
print_type = 2

def isset ( list_arr , name ) :
if name in list_arr :
return True
else :
return False

print_str = "";

if ( print_type == 1 ) or isset( sys.argv,"mem" ) :
memory_convent = 1024 * 1024
mem = psutil.virtual_memory()
print_str += " 內存狀態如下: "
print_str = print_str + " 系統的內存容量為: "+str( mem.total/( memory_convent ) ) + " MB "
print_str = print_str + " 系統的內存以使用容量為: "+str( mem.used/( memory_convent ) ) + " MB "
print_str = print_str + " 系統可用的內存容量為: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB "
print_str = print_str + " 內存的buffer容量為: "+str( mem.buffers/( memory_convent ) ) + " MB "
print_str = print_str + " 內存的cache容量為:" +str( mem.cached/( memory_convent ) ) + " MB "

if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :
print_str += " CPU狀態如下: "
cpu_status = psutil.cpu_times()
print_str = print_str + " user = " + str( cpu_status.user ) + " "
print_str = print_str + " nice = " + str( cpu_status.nice ) + " "
print_str = print_str + " system = " + str( cpu_status.system ) + " "
print_str = print_str + " idle = " + str ( cpu_status.idle ) + " "
print_str = print_str + " iowait = " + str ( cpu_status.iowait ) + " "
print_str = print_str + " irq = " + str( cpu_status.irq ) + " "
print_str = print_str + " softirq = " + str ( cpu_status.softirq ) + " "
print_str = print_str + " steal = " + str ( cpu_status.steal ) + " "
print_str = print_str + " guest = " + str ( cpu_status.guest ) + " "

if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :
print_str += " 硬碟信息如下: "
disk_status = psutil.disk_partitions()
for item in disk_status :
print_str = print_str + " "+ str( item ) + " "

if ( print_type == 1 ) or isset ( sys.argv,"user" ) :
print_str += " 登錄用戶信息如下: "
user_status = psutil.users()
for item in user_status :
print_str = print_str + " "+ str( item ) + " "

print_str += "--------------------------------------------------------------- "
print ( print_str )
handle.write( print_str )
handle.close()

import psutil

mem = psutil.virtual_memory()
print mem.total,mem.used,mem
print psutil.swap_memory() # 輸出獲取SWAP分區信息

cpu = psutil.cpu_stats()
printcpu.interrupts,cpu.ctx_switches

psutil.cpu_times(percpu=True) # 輸出每個核心的詳細CPU信息
psutil.cpu_times().user # 獲取CPU的單項數據 [用戶態CPU的數據]
psutil.cpu_count() # 獲取CPU邏輯核心數,默認logical=True
psutil.cpu_count(logical=False) # 獲取CPU物理核心數

psutil.disk_partitions() # 列出全部的分區信息
psutil.disk_usage('/') # 顯示出指定的掛載點情況【位元組為單位】
psutil.disk_io_counters() # 磁碟總的IO個數
psutil.disk_io_counters(perdisk=True) # 獲取單個分區IO個數

psutil.net_io_counter() 獲取網路總的IO,默認參數pernic=False
psutil.net_io_counter(pernic=Ture)獲取網路各個網卡的IO

psutil.pids() # 列出所有進程的pid號
p = psutil.Process(2047)
p.name() 列出進程名稱
p.exe() 列出進程bin路徑
p.cwd() 列出進程工作目錄的絕對路徑
p.status()進程當前狀態[sleep等狀態]
p.create_time() 進程創建的時間 [時間戳格式]
p.uids()
p.gids()
p.cputimes() 【進程的CPU時間,包括用戶態、內核態】
p.cpu_affinity() # 顯示CPU親緣關系
p.memory_percent() 進程內存利用率
p.meminfo() 進程的RSS、VMS信息
p.io_counters() 進程IO信息,包括讀寫IO數及位元組數
p.connections() 返回打開進程socket的nametples列表
p.num_threads() 進程打開的線程數

import psutil
from subprocess import PIPE
p =psutil.Popen(["/usr/bin/python" ,"-c","print 'helloworld'"],stdout=PIPE)
p.name()
p.username()
p.communicate()
p.cpu_times()

psutil.users() # 顯示當前登錄的用戶,和Linux的who命令差不多

psutil.boot_time() 結果是個UNIX時間戳,下面我們來轉換它為標准時間格式,如下:
datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的結果不是str格式,繼續進行轉換 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d%H:%M:%S')

Python生成一個隨機密碼

import random, string
def GenPassword(length):

if name == ' main ':
print (GenPassword(6))

⑤ Python寫每隔一段時間就運行一次的任務,怎麼寫比較好

# coding=utf-8# using python2.7import timestart_job = Truewhile start_job: # do something here. print 1 # 這里改成你想做的任務 time.sleep(60) # 每隔一分鍾執行一次print任務, sleep里的參數按秒算

⑥ win10 任務計劃程序 定時運行 python 或 bat

1. 先找到python

2. 保證 .bat 文件和其中要運行的 .py 是 UTF-8 格式
3. 查看日誌,可打開右側「啟用所有任務歷史記錄」
    歷史記錄(已禁用)變為歷史記錄
4. 定時按需設成每日等

⑦ Python RabbitMQ原理和使用場景以及模式

客戶端通過 TCP 連接到 RabbitMQ Server。
連接成功後 RabbitMQ 會創建一個 AMQP 信道。
信道是創建在 TCP 上的虛擬連接,AMQP 命令都是通過信道發送出去的,每個信道都會有一個唯一的 ID,不論是發布消息,訂閱隊列或者介紹消息都是通過信道完成的。

設置如下:

只做隊列持久化是不行的還需要在加上消息持久化

消息生產者

消息消費者

:在修改消費者的 routing_key 後,需要重新創建隊列。

消息生產者

在生產者中只需要修改 exchange_declare 他的 type 為 fanout 即可

消息消費者

例如:
接收者(消費者) routing_key='a.*'
發送者(生產者) routing_key = 'a.b.c.d'
結果:匹配失敗

因為:
* 表示匹配一個單詞
# 表示匹配0個或多個單詞

消息生產者

消息消費者

消息生產者

消息消費者

三個任務,任務1、任務2、任務3。他們之間有相互的制約。執行 任務1 的前提是要有 任務2 的結果。執行 任務2 需要 任務3 的結果。

一般會使用 crontab 來做計劃任務。預估一下每個任務的完成時間。然後制定任務。當數量變大處理時間變成,就需要經常修改 crontab 任務。

使用 RabbitMQ 後每個任務結束時只需要發送一個結束信息即可

如: 任務2 訂閱 任務3 的信息。當 任務3 完成後發送一個完成消息。 任務2 接收到完成消息後開始執行,在執行結束後發送 任務2 完成消息。 任務1 訂閱 任務2 的消息,然後執行。

有三個用戶A、B、C 他們分別發送文章,但後台會根據用戶的級別做不同的操作。
A 是普通用戶,系統發布。
B 是 VIP 用戶,系統發布和推薦給關注這部分內容的客戶。
C 是黑卡客戶,系統發布、推薦給關注這部分內容的客戶、在這個分類中置頂這篇文章。
文章發布服務只關心是否成功,剩下的操作都不關心。可以使用 RabbitMQ 服務將其他操作分離。

有一個操作 A。用戶執行這個任務後,又非常需要結果。

為什麼不是後台直接將結果發送給用戶呢。因為一旦增加了訂閱用戶,就需要修改後台程序,這樣很惡心。

RabbitMQ 自帶管理後台,安裝後需要配置開啟

進入 RabbitMQ 安裝目錄中的 sbin 目錄執行

http://localhost:15672/

用戶名密碼均為guest

⑧ 用Windows的「任務計劃程序」執行Python腳本文件

整個任務計劃設置一路下一步就ok了,幾乎Xp之後每個版本的Windows設置都是差不多的。稍微提一下, 創建基本任務 創建任務 的區別是一個有引導界面,一個直接彈出設置界面直接設置。

出問題的是在這個界面的設置方法:

這里先引用一段其他文章里的設置方法:

上面的設置里問題還是挺多的,特別是關於 起始於 的解釋。事實上這里的起始於並不是填寫編譯器所在的目錄,編譯器所在的目錄實際上應該是在 程序與腳本 中就已經填寫過了。而這里 起始於 要填寫的內容應該是

同時,我們注意到,這里的選項後面有一個 (可選) ,意思是如果此處不填的話(預設狀態),那麼自動將該處的目錄位置定位到起始執行目錄的位置(在 任務計劃程序 執行的過程中通常是 編譯器所在的目錄位置 )。

其實這個跟python的一個內置模塊有關。
os模塊下我們經常用到的os.getcwd(),用於獲取腳本當前目錄位置;
而os.path.pardir可以獲得當前腳本目錄的上一級目錄。
他們在一般雙擊運行的情況下的輸出為(舉例):
D:Python
..
而當使用編譯器運行python腳本和直接雙擊執行python腳本時,os.getcwd()的輸出結果是不一樣的:
D:Python
C:
換句話說,在使用編譯器執行時(即在cmd中執行 python D:Pythonpath.py ),os.getcwd()取的是python編譯器所在的目錄。 更一般的講 ,os.getcwd()本身這個方法,返回的是當前執行python文件的文件目錄, os.path.pardir同理 。一般情況下,打開腳本時,cmd會自動定位到當前腳本所在位置,類似使用了命令 cd D:Python ,所以執行目錄就在腳本位置,所以輸出 D:Python ;而當用 任務計劃程序 執行時,其本質就是在cmd中將參數拼接,形成 python D:Pythonpath.py 這樣的一條命令來運行程序,所以此時他返回的python的執行文件目錄,就是python的安裝目錄 C: 。

因此我們需要定義 起始於 這個參數來告訴Windows實際的腳本位置在哪裡,否則就會出現找不到目標文件的錯誤。如果腳本中沒有牽涉到類似的目錄位置等情況時(實際情況是這樣的定時腳本大概率會遇到文件輸入輸出問題),此項大可以不填。

現在重新對這里的設置進行總結:

⑨ python讀取windows定時任務

1、准備好Python腳本py文件,放置在一個文件夾下。
2、創建一個txt文件,並把文件類型(後綴)改為bat、編寫文件內容。
3、Win鍵+R,輸入compmgmt,msc,調出計算機管理窗口。點擊左側任務計劃程序,再點擊右側創建基本任務。
4、填寫任務的基本信息,選擇腳本執行的觸發器。
5、選擇具體時間,選擇執行的操作、選擇要執行的腳本,創建完成。

熱點內容
配置管理需要會什麼 發布:2025-07-14 01:35:35 瀏覽:372
去除頭條中的緩存 發布:2025-07-14 01:27:38 瀏覽:783
php開啟錯誤 發布:2025-07-14 01:16:49 瀏覽:998
esp資料庫 發布:2025-07-14 01:16:44 瀏覽:980
python查找文件路徑 發布:2025-07-14 01:16:03 瀏覽:514
phpapachetomcat 發布:2025-07-14 01:08:41 瀏覽:123
伺服器運維看什麼書 發布:2025-07-14 01:07:32 瀏覽:988
密碼器動態密碼怎麼弄 發布:2025-07-14 00:44:27 瀏覽:386
小米怎麼把視頻加密 發布:2025-07-14 00:42:59 瀏覽:406
在線申訴找回密碼根本什麼都沒有 發布:2025-07-14 00:41:22 瀏覽:306