Merge branch 'linting' of framasky/masto-image-bot into master
This commit is contained in:
commit
0063745171
292
bot.py
292
bot.py
|
@ -18,11 +18,11 @@ secrets_filepath = get_parameter("secrets_filepath", config_file)
|
|||
log_filepath = get_parameter("log_filepath", config_file)
|
||||
blacklist_filepath = get_parameter("blacklist_filepath", config_file)
|
||||
collection_filepath = get_parameter("collection_filepath", config_file)
|
||||
log = init_log(log_filepath)
|
||||
mastodon = init_mastodon(config_file, secrets_filepath)
|
||||
log = init_log(log_filepath)
|
||||
mastodon = init_mastodon(config_file, secrets_filepath)
|
||||
|
||||
blacklist_file = open(blacklist_filepath,'r')
|
||||
BLACKLIST = json.loads(blacklist_file.read())
|
||||
BLACKLIST = json.loads(blacklist_file.read())
|
||||
blacklist_file.close()
|
||||
|
||||
mime_dict = {'.jpg': 'image/jpeg', '.jpe': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif'}
|
||||
|
@ -39,14 +39,14 @@ def post_img_local(mastodon, text, log, config):
|
|||
if ".zip" not in file:
|
||||
continu = False
|
||||
|
||||
im = Image.open(img_path+ file)
|
||||
im = Image.open(img_path+ file)
|
||||
width, height = im.size
|
||||
NEW_WIDTH = 2048
|
||||
if width > 2048:
|
||||
difference_percent = NEW_WIDTH / width
|
||||
new_height = height * difference_percent
|
||||
size = new_height, NEW_WIDTH
|
||||
im = im.resize((int(NEW_WIDTH), int(new_height)))
|
||||
new_height = height * difference_percent
|
||||
size = new_height, NEW_WIDTH
|
||||
im = im.resize((int(NEW_WIDTH), int(new_height)))
|
||||
im.save('resize_img.jpg')
|
||||
file = "resize_img.jpg"
|
||||
shutil.copyfile(file, "/tmp/"+file)
|
||||
|
@ -55,7 +55,7 @@ def post_img_local(mastodon, text, log, config):
|
|||
shutil.copyfile(img_path+file, "/tmp/"+file)
|
||||
|
||||
image_byte = open("/tmp/"+file, "rb").read()
|
||||
file, ext = os.path.splitext(file)
|
||||
file, ext = os.path.splitext(file)
|
||||
os.remove("/tmp/"+file+ext)
|
||||
|
||||
try:
|
||||
|
@ -72,175 +72,175 @@ def post_img_distant(mastodon, text, log, config):
|
|||
|
||||
collection_url = get_parameter("collection_url", config)
|
||||
collecion_file = open(collection_filepath,'r')
|
||||
collections = json.loads(collecion_file.read())
|
||||
collections = json.loads(collecion_file.read())
|
||||
collecion_file.close()
|
||||
|
||||
count_collection = len(collections)-1
|
||||
id_collection = randint(0,count_collection)
|
||||
id_collection = randint(0,count_collection)
|
||||
|
||||
|
||||
collection_url = collection_url.replace("<collection>", str(collections[id_collection]))
|
||||
response = requests.get(collection_url)
|
||||
pattern = Image.open(BytesIO(response.content), "r").convert('RGB')
|
||||
response = requests.get(collection_url)
|
||||
pattern = Image.open(BytesIO(response.content), "r").convert('RGB')
|
||||
pattern.save('output.jpg')
|
||||
|
||||
media_dict = mastodon.media_post("output.jpg")
|
||||
return media_dict;
|
||||
|
||||
cleanr = re.compile('<.*?>')
|
||||
def cleanhtml(raw_html):
|
||||
cleanr = re.compile('<.*?>')
|
||||
cleantext = re.sub(cleanr, '', raw_html)
|
||||
return cleantext
|
||||
|
||||
class BotListener(StreamListener):
|
||||
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
|
||||
# use only notification
|
||||
def on_notification(self, notification):
|
||||
# use only notification
|
||||
def on_notification(self, notification):
|
||||
|
||||
# catch only mention in notification
|
||||
if notification['type'] == 'mention':
|
||||
log.debug("Got a mention")
|
||||
if notification["account"]["bot"] == False:
|
||||
sender = notification['account']['acct'] # Get sender name
|
||||
if sender in BLACKLIST:
|
||||
log.info("Service refused to %s" % sender)
|
||||
return
|
||||
sender_hour_filename = "limiter/hour/" + sender; # Forge file for limiter
|
||||
sender_minute_filename = "limiter/minute/" + sender; # Forge file for limiter
|
||||
# catch only mention in notification
|
||||
if notification['type'] == 'mention':
|
||||
log.debug("Got a mention")
|
||||
if notification["account"]["bot"] == False:
|
||||
sender = notification['account']['acct'] # Get sender name
|
||||
if sender in BLACKLIST:
|
||||
log.info("Service refused to %s" % sender)
|
||||
return
|
||||
sender_hour_filename = "limiter/hour/" + sender; # Forge file for limiter
|
||||
sender_minute_filename = "limiter/minute/" + sender; # Forge file for limiter
|
||||
|
||||
if os.path.isfile(sender_hour_filename): # Check if file exist
|
||||
log.debug("Sender file exist")
|
||||
statbuf = os.stat(sender_hour_filename)
|
||||
last_edit = int(statbuf.st_mtime)
|
||||
ts = int(time.time())
|
||||
if ts - last_edit > 3599: # check if file is modified 1 hour after last edition
|
||||
log.debug("file is too old")
|
||||
f = open(sender_hour_filename,'w')
|
||||
f.write(str(1)) # reset counter
|
||||
f.close()
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("file is young")
|
||||
f = open(sender_hour_filename,'r+')
|
||||
limit = int(get_parameter("limit_hour", config_file))
|
||||
number_of_mention = int(f.read())
|
||||
if number_of_mention < limit: # limit of mention per hour is limit_hour
|
||||
log.debug("Sender have less of limit requests")
|
||||
f.seek(0)
|
||||
f.write(str(number_of_mention + 1))
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("Sender have more of limit requests")
|
||||
can_continue = False # if number of mention is for, user can't receive anything
|
||||
f.close()
|
||||
else: # File not exist, create it and initialise it
|
||||
log.debug("Sender file not exist")
|
||||
f = open(sender_hour_filename,"w+")
|
||||
f.write(str(1))
|
||||
f.close()
|
||||
can_continue = True
|
||||
if os.path.isfile(sender_hour_filename): # Check if file exist
|
||||
log.debug("Sender file exist")
|
||||
statbuf = os.stat(sender_hour_filename)
|
||||
last_edit = int(statbuf.st_mtime)
|
||||
ts = int(time.time())
|
||||
if ts - last_edit > 3599: # check if file is modified 1 hour after last edition
|
||||
log.debug("file is too old")
|
||||
f = open(sender_hour_filename,'w')
|
||||
f.write(str(1)) # reset counter
|
||||
f.close()
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("file is young")
|
||||
f = open(sender_hour_filename,'r+')
|
||||
limit = int(get_parameter("limit_hour", config_file))
|
||||
number_of_mention = int(f.read())
|
||||
if number_of_mention < limit: # limit of mention per hour is limit_hour
|
||||
log.debug("Sender have less of limit requests")
|
||||
f.seek(0)
|
||||
f.write(str(number_of_mention + 1))
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("Sender have more of limit requests")
|
||||
can_continue = False # if number of mention is for, user can't receive anything
|
||||
f.close()
|
||||
else: # File not exist, create it and initialise it
|
||||
log.debug("Sender file not exist")
|
||||
f = open(sender_hour_filename,"w+")
|
||||
f.write(str(1))
|
||||
f.close()
|
||||
can_continue = True
|
||||
|
||||
if can_continue:
|
||||
if os.path.isfile(sender_minute_filename): # Check if file exist
|
||||
log.debug("Sender file exist")
|
||||
statbuf = os.stat(sender_minute_filename)
|
||||
last_edit = int(statbuf.st_mtime)
|
||||
ts = int(time.time())
|
||||
if ts - last_edit > 59: # check if file is modified 1 minute after last edition
|
||||
log.debug("file is too old")
|
||||
f = open(sender_minute_filename,'w')
|
||||
f.write(str(1)) # reset counter
|
||||
f.close()
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("file is young")
|
||||
f = open(sender_minute_filename,'r+')
|
||||
limit = int(get_parameter("limit", config_file))
|
||||
number_of_mention = int(f.read())
|
||||
if number_of_mention < limit: # limit of mention per minute is 4
|
||||
log.debug("Sender have less of limit requests")
|
||||
f.seek(0)
|
||||
f.write(str(number_of_mention + 1))
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("Sender have more of limit requests")
|
||||
can_continue = False # if number of mention is for, user can't receive anything
|
||||
file = open(sender_hour_filename,'r+')
|
||||
number_of_mention = int(file.read())
|
||||
file.seek(0)
|
||||
file.write(str(number_of_mention - 1))
|
||||
file.close()
|
||||
f.close()
|
||||
else: # File not exist, create it and initialise it
|
||||
log.debug("Sender file not exist")
|
||||
f = open(sender_minute_filename,"w+")
|
||||
f.write(str(1))
|
||||
f.close()
|
||||
can_continue = True
|
||||
if can_continue:
|
||||
if os.path.isfile(sender_minute_filename): # Check if file exist
|
||||
log.debug("Sender file exist")
|
||||
statbuf = os.stat(sender_minute_filename)
|
||||
last_edit = int(statbuf.st_mtime)
|
||||
ts = int(time.time())
|
||||
if ts - last_edit > 59: # check if file is modified 1 minute after last edition
|
||||
log.debug("file is too old")
|
||||
f = open(sender_minute_filename,'w')
|
||||
f.write(str(1)) # reset counter
|
||||
f.close()
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("file is young")
|
||||
f = open(sender_minute_filename,'r+')
|
||||
limit = int(get_parameter("limit", config_file))
|
||||
number_of_mention = int(f.read())
|
||||
if number_of_mention < limit: # limit of mention per minute is 4
|
||||
log.debug("Sender have less of limit requests")
|
||||
f.seek(0)
|
||||
f.write(str(number_of_mention + 1))
|
||||
can_continue = True
|
||||
else:
|
||||
log.debug("Sender have more of limit requests")
|
||||
can_continue = False # if number of mention is for, user can't receive anything
|
||||
file = open(sender_hour_filename,'r+')
|
||||
number_of_mention = int(file.read())
|
||||
file.seek(0)
|
||||
file.write(str(number_of_mention - 1))
|
||||
file.close()
|
||||
f.close()
|
||||
else: # File not exist, create it and initialise it
|
||||
log.debug("Sender file not exist")
|
||||
f = open(sender_minute_filename,"w+")
|
||||
f.write(str(1))
|
||||
f.close()
|
||||
can_continue = True
|
||||
|
||||
if can_continue:
|
||||
id = notification['status']['id']
|
||||
visibility = notification['status']['visibility']
|
||||
if visibility == 'public':
|
||||
visibility = 'unlisted'
|
||||
mentions = notification['status']['mentions']
|
||||
text = "@" + notification['status']["account"]["acct"] + " "
|
||||
for mention in mentions:
|
||||
if mention["acct"] != get_parameter("name_bot", config_file):
|
||||
text = text + "@" + mention["acct"] + " "
|
||||
if can_continue:
|
||||
id = notification['status']['id']
|
||||
visibility = notification['status']['visibility']
|
||||
if visibility == 'public':
|
||||
visibility = 'unlisted'
|
||||
mentions = notification['status']['mentions']
|
||||
text = "@" + notification['status']["account"]["acct"] + " "
|
||||
for mention in mentions:
|
||||
if mention["acct"] != get_parameter("name_bot", config_file):
|
||||
text = text + "@" + mention["acct"] + " "
|
||||
|
||||
if get_parameter("sensitive", config_file) == "yes":
|
||||
sensitive = True
|
||||
else:
|
||||
sensitive = False
|
||||
if get_parameter("sensitive", config_file) == "yes":
|
||||
sensitive = True
|
||||
else:
|
||||
sensitive = False
|
||||
|
||||
if self.args.source == "local":
|
||||
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
elif self.args.source == "distant":
|
||||
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
if self.args.source == "local":
|
||||
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
elif self.args.source == "distant":
|
||||
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
|
||||
mastodon.status_post(text, id, media_ids=[media_dict], sensitive=sensitive, visibility=visibility, spoiler_text=get_parameter("spoiler_text", config_file))
|
||||
else:
|
||||
log.debug("no picture send :(")
|
||||
pass
|
||||
else:
|
||||
log.debug("Nevermind")
|
||||
mastodon.status_post(text, id, media_ids=[media_dict], sensitive=sensitive, visibility=visibility, spoiler_text=get_parameter("spoiler_text", config_file))
|
||||
else:
|
||||
log.debug("no picture send :(")
|
||||
pass
|
||||
else:
|
||||
log.debug("Nevermind")
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(description='Choose between image or streaming')
|
||||
parser.add_argument("-i", "--img", action='store_true', help="post image")
|
||||
parser.add_argument("-s", "--source", help="Source of image [ local | distant ]")
|
||||
parser.add_argument("--stream", action="store_true", help="stream user profile")
|
||||
args = parser.parse_args()
|
||||
if args.img:
|
||||
if args.source == "local":
|
||||
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
elif args.source == "distant":
|
||||
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
parser = argparse.ArgumentParser(description='Choose between image or streaming')
|
||||
parser.add_argument("-i", "--img", action='store_true', help="post image")
|
||||
parser.add_argument("-s", "--source", help="Source of image [ local | distant ]")
|
||||
parser.add_argument("--stream", action="store_true", help="stream user profile")
|
||||
args = parser.parse_args()
|
||||
if args.img:
|
||||
if args.source == "local":
|
||||
media_dict = post_img_local(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
elif args.source == "distant":
|
||||
media_dict = post_img_distant(mastodon, get_parameter("default_text", config_file), log, config_file)
|
||||
|
||||
if get_parameter("sensitive", config_file) == "yes":
|
||||
sensitive = True
|
||||
else:
|
||||
sensitive = False
|
||||
mastodon.status_post(get_parameter("default_text", config_file), None, media_ids=[media_dict], sensitive=sensitive, visibility='public', spoiler_text=get_parameter("spoiler_text", config_file))
|
||||
sys.exit()
|
||||
if get_parameter("sensitive", config_file) == "yes":
|
||||
sensitive = True
|
||||
else:
|
||||
sensitive = False
|
||||
mastodon.status_post(get_parameter("default_text", config_file), None, media_ids=[media_dict], sensitive=sensitive, visibility='public', spoiler_text=get_parameter("spoiler_text", config_file))
|
||||
sys.exit()
|
||||
|
||||
elif args.stream:
|
||||
stream = BotListener(args);
|
||||
while True:
|
||||
try:
|
||||
log.info("Start listening...")
|
||||
mastodon.stream_user(stream)
|
||||
except Exception as error:
|
||||
log.warning('General exception caught: ' + str(error))
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
print("Require an argument. Use --help to display help")
|
||||
elif args.stream:
|
||||
stream = BotListener(args);
|
||||
while True:
|
||||
try:
|
||||
log.info("Start listening...")
|
||||
mastodon.stream_user(stream)
|
||||
except Exception as error:
|
||||
log.warning('General exception caught: ' + str(error))
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
print("Require an argument. Use --help to display help")
|
||||
|
||||
main()
|
||||
|
||||
|
|
Reference in a new issue