my-utils

functions I use across many projects

26 Jul 2022

def get_email(res):
    import re
    allEmails = re.findall(r"[a-z0-9.\-+_]+@[a-z0-9.\-+_]+\.[a-z]+", res.text, re.I)
    return allEmails
def is_website_root(url):
    from urllib.parse import urlparse
    site = urlparse(url)
    if not site.path or site.path == '/':
        return True
    return False
def get_website_root(url):
    from urllib.parse import urlparse
    site = urlparse(url)
    if not site.scheme:
        raise KeyError('Invalid URL, must include http or https scheme')
    return site.scheme + '://' + site.netloc
def is_valid_email(email, website):
    from urllib.parse import urlparse
    site = urlparse(website)
    domain = site.netloc if site.netloc else site.path.strip('/')
    if domain.lower() in email.split('@', 1)[1].lower():
        return True
    return False
from datetime import datetime
import os
print(f"\n>>>\n{datetime.now().strftime('%H:%M:%S')} starting {os.path.basename(__file__)}")
import time
start_time = time.time()

import ssl
import tldextract
from urllib.parse import urlparse
import csv
import pprint
import urllib3
from urllib3.exceptions import HTTPError
import urllib
import requests
import re
import httplib2
from datetime import datetime
import time
import json
import string
import sqlite3

from dotenv import load_dotenv
load_dotenv()

pp = pprint.PrettyPrinter(indent=4)

####################
# OPEN DB CONNECTION

db = sqlite3.connect('/path/to/local/sqlite/database/database.db')
c = db.cursor()


####################
# COUNTRY

# Build the various Country dicts needed by the functions, with local DB
dict_country = {'UK': 'UK'}
dict_country_extended = {'UK': 'UK'}
dict_EU = {}
dict_ID = {}
c.execute("""
        SELECT rowid, code, name, eu
        FROM countries;
        """)
for row in c.fetchall():
    id = row[0]
    code = row[1].strip()
    name = row[2].strip()
    eu = row[3]
    if eu.lower() == 'true':
        eu = True
    else:
        eu = False
    dict_country[name] = code
    dict_country_extended[name] = code
    dict_EU[code] = eu
    dict_ID[code] = id

# dict_country_extended
c.execute("""
        SELECT string, country_code
        FROM countries_extended;
        """)
dict_country_extended = {x[0]:x[1] for x in c.fetchall()}

def country_code_from_tld(tld):
    global c
    c.execute("""
        SELECT tld, country_code
        FROM tlds;
        """)
    dict_tld_country_code = {x[0]:x[1] for x in c.fetchall()}
    return dict_tld_country_code[tld]

def get_country_code(country_name):
    start_func = time.time()

    global dict_country_extended

    if country_name == 'UK':
        country_code = 'UK'

        run_time_func = round((time.time() - start_func), 2)
        print(f'\n{run_time_func}s for func: get_country_code\n')
        return country_code

    elif country_name in dict_country_extended:
        country_code = dict_country_extended[country_name]

        return country_code

def get_list_country_codes():
    start_func = time.time()
    list_country_codes = dict_country

    run_time_func = round((time.time() - start_func), 2)
    print(f'\n{run_time_func}s for func: get_list_country_codes\n')
    return list_country_codes

def get_country_id_from_code(code):
    id = dict_ID[code]
    return id

def get_country_code_from_id(id):
    for k, v in dict_ID.items():
        if v == id:
            return k

def is_in_EU(country_code):
    is_in_EU = dict_EU[country_code]
    return is_in_EU

def non_eu_tlds(): # returns List
    # start_func = time.time()
    list_non_eu_tlds = []
    with open('/path/to/file/tlds.csv', 'r', newline='', encoding='UTF-8') as i:
        reader = csv.reader(i, delimiter=",")
        data = list(reader)

        for row in data:
            tld = row[0].strip()
            eu = row[2]
            if eu == 'False':
                list_non_eu_tlds.append(tld)

    # run_time_func = round((time.time() - start_func), 2)
    # print(f'\n{run_time_func}s for func: non_eu_tlds\n')
    return list_non_eu_tlds

def find_country_code(input_text):
    global dict_country_extended
    for country in dict_country_extended:
        if country.lower() in input_text.lower():
            return dict_country_extended[country]

def get_call_dict():
    start_func = time.time()
    dict_call_code = {}
    with open('/path/to/file/country_call.csv', 'r', newline='', encoding='UTF-8') as h:
        reader = csv.reader(h, delimiter=",")
        next(reader)
        data = list(reader)

        for row in data:
            call_code = row[0]
            country_code = row[1]
            dict_call_code[call_code] = country_code

        run_time_func = round((time.time() - start_func), 2)
        print(f'\n{run_time_func}s for func: get_call_dict\n')
        return dict_call_code

def country_by_email_domain(email):
    start_func = time.time()
    full_domain = email.split('@')[1]
    for k,v in get_list_country_codes().items():
        if full_domain.startswith(f"{v.lower()}."):
            country = v
        else:
            country = ''
    if country == '':
        for k,v in get_list_country_codes().items():
            if full_domain.endswith(f".{v.lower()}"):
                country = v
            else:
                country = ''

    run_time_func = round((time.time() - start_func), 2)
    print(f'\n{run_time_func}s for func: country_by_email_domain\n')
    return country

def country_by_phone(phone_as_string):
    start_func = time.time()
    phone = phone_as_string.strip()
    dict_call_code = get_call_dict()
    country = ''
    for k, v in dict_call_code.items():
        if phone.startswith(f"+{k}") or phone.startswith(f"+ {k}") or phone.startswith(f"00{k}") or phone.startswith(f"00 {k}") or phone.startswith(f"++{k}"):
            country = v

    if country == '':
        pattern = re.compile(r'^\(\d{3}\)|^\d{3}\-|^\d{3}\.')
        x = re.search(pattern, phone)
        if x not in [None, '']:
            country = 'US'

    # Second pass on country_call list for numbers left, starting just with country code
    if country == '':
        for k, v in dict_call_code.items():
            if phone.startswith(f"{k}") and not phone.startswith(f"{k}{k}"):
                country = v

    run_time_func = round((time.time() - start_func), 2)
    print(f'\n{run_time_func}s for func: country_by_phone\n')
    return country


def first_from_email(email): 
    start_func = time.time()

    email_prefix = email.split('@')[0].lower()

    first = ''

    # Gather team emails
    db = sqlite3.connect('/path/to/file/database.db')
    c = db.cursor()
    c.execute("""
            SELECT email
            FROM team_emails;
            """)
    team_emails = [x[0] for x in c.fetchall()]
    db.close()

    # First identify if known team email
    if email_prefix == 'hi':
        first = 'team'
    if any(ele.lower() in email_prefix.lower() for ele in team_emails):
        first = 'team'

    # Now try to get first name if not team email
    else: 

        if '.' in email_prefix:
            pre_prefix = email_prefix.split('.')[0]
            if len(pre_prefix) > 1:
                if '-' in pre_prefix:
                    first = string.capwords(pre_prefix, sep='-')
                else:
                    first = string.capwords(pre_prefix, sep=None)

        if '_' in email_prefix:
            pre_prefix = email_prefix.split('_')[0]
            if len(pre_prefix) > 1:
                if '-' in pre_prefix:
                    first = string.capwords(pre_prefix, sep='-')
                else:
                    first = string.capwords(pre_prefix, sep=None)

        if first == '':

            # Catch first@
            db = sqlite3.connect('/path/to/file/database.db')
            c = db.cursor()
            c.execute("""
                    SELECT first
                    FROM first
                    WHERE lower(first) LIKE ?;
                    """,
                    (f"{email_prefix}",)
                    )
            potential_first = [x[0] for x in c.fetchall()]
            db.close()

            if len(potential_first) > 0:
                first = str(potential_first[0])

            # Catch firstlast@
            if first == '':

                db = sqlite3.connect('path/to/file/database.db')
                c = db.cursor()
                c.execute("""
                        SELECT first
                        FROM first;
                        """
                        )
                # Fin all potential matches
                potential_first = [x[0] for x in c.fetchall() if email_prefix.startswith(x[0].lower())]
                # Take the longest first name found in prefix
                def longest_string(x):
                    strings = [i for i in x if isinstance(i, str)]
                    return (max(strings, key=len)) if strings else ''

                first = longest_string(potential_first)

        run_time_func = round((time.time() - start_func), 2)
        if run_time_func > 2:
            print(f'\n{run_time_func}s for func: first_from_email\n')
    return first


## Twitter

def handle_from_long_url(long_social_url):
    o = urlparse(long_social_url)
    handle = o.path.replace('/','')
    if 'twitter.com' in handle:
        handle = handle.replace('twitter.com', '')
    return handle

## Email

def email_to_pattern(email,firstname,lastname):
    start_func = time.time()
    firstname = firstname.lower()
    lastname = lastname.lower()

    words = email.strip().split("@")
    domain = words[1]
    email = words[0]

    domainpieces = domain.split(".")
    domainpieces[0] = "domain"

    domain = ''
    if len(domainpieces) > 2:
        for item in domainpieces:
            if item == domainpieces[0]:
                domain += item + "."
            elif item == domainpieces[len(domainpieces)-1]:
                domain += item
            else:
                domain += item + "."
    else:
        domain = domainpieces[0]+"."+domainpieces[1]

    if firstname[0]+lastname[0] in email:

        email = email.replace(f"{(firstname[0]+lastname[0])}",f"f{lastname[0]}")

    if firstname in email:
        email = email.replace(firstname,"first")
    if lastname in email:
        email = email.replace(lastname,"last")

    if firstname[0]+'.' in email:
        email = email.replace(firstname[0]+'.',"f.")
    if lastname[0]+"." in email:
        email = email.replace(lastname[0]+".","l.")

    run_time_func = round((time.time() - start_func), 2)
    print(f'\n{run_time_func}s for func: email_to_pattern\n')
    return email


####################
# CLEANING

def clean_long_url(long_url):
    o = urlparse(long_url)
    return f"{o.scheme}://{o.netloc}{o.path}"

def clean_website_url(website_url):
    # start_func = time.time()
    o = tldextract.extract(website_url)
    if o.subdomain != '':
        clean_website_url = f"https://{o.subdomain}.{o.domain}.{o.suffix}".lower()
    else:
        clean_website_url = f"https://{o.domain}.{o.suffix}".lower()

    # run_time_func = round((time.time() - start_func), 2)
    # print(f'\n{run_time_func}s for func: clean_website_url\n')
    return clean_website_url

# Socials

blacklist_socials = [
    'dialog',
    'privacy',
    'login',
    '.jpg',
    '.png',
    ]

def clean_facebook_url(fb): # facebook string

    global blacklist_socials

    if 'facebook' in fb:
        # Remove root links
        if fb.endswith('facebook.com/') or fb.endswith('facebook.com') or fb.endswith('facebook.com#') or fb.endswith('facebook.com/#'):
            print(f"Removing entry for: {fb}")
            return None
        else:
            # Fix format
            parts = fb.split('facebook.com/')
            clean_facebook = parts[1]
            if '?' in clean_facebook:
                parts2 = clean_facebook.split('?')
                clean_facebook = parts2[0]
            if clean_facebook.endswith('/'):
                clean_facebook = clean_facebook[:-1]
            if any(ele in clean_facebook for ele in blacklist_socials):
                return None
            if '\t' in clean_facebook:
                clean_facebook = clean_facebook.replace('\t', '')
            clean_facebook = clean_facebook.strip()
            clean_facebook = f"https://www.facebook.com/{clean_facebook}"

            return clean_facebook
    else:
        return print(f"NOT a Facebook link: {fb}")

def clean_twitter_url(tt): # twitter string

    global blacklist_socials

    if 'twitter' in tt:
        # Remove root links
        if tt.endswith('twitter.com/') or tt.endswith('twitter.com') or tt.endswith('twitter.com#') or tt.endswith('twitter.com/#'):
            print(f"Removing entry for: {tt}")
            return None
        else:
            # Fix format
            parts = tt.split('twitter.com/')
            clean_twitter = parts[1]
            if '?' in clean_twitter:
                parts2 = clean_twitter.split('?')
                clean_twitter = parts2[0]
            if clean_twitter.endswith('/'):
                clean_twitter = clean_twitter[:-1]
            if any(ele in clean_twitter for ele in blacklist_socials):
                return None
            if '\t' in clean_twitter:
                clean_twitter = clean_twitter.replace('\t', '')
            clean_twitter = clean_twitter.strip()
            clean_twitter = f"https://twitter.com/{clean_twitter}"

            return clean_twitter
    else:
        return print(f"NOT a Twitter link: {tt}")


def clean_instagram_url(it): # instagram string

    global blacklist_socials

    if 'instagram' in it:
        # Remove root links
        if it.endswith('instagram.com/') or it.endswith('instagram.com') or it.endswith('instagram.com#') or it.endswith('instagram.com/#'):
            print(f"Removing entry for: {it}")
            return None
        else:
            # Fix format
            parts = it.split('instagram.com/')
            clean_instagram = parts[1]
            if '?' in clean_instagram:
                parts2 = clean_instagram.split('?')
                clean_instagram = parts2[0]
            if clean_instagram.endswith('/'):
                clean_instagram = clean_instagram[:-1]
            if any(ele in clean_instagram for ele in blacklist_socials):
                return None
            if '\t' in clean_instagram:
                clean_instagram = clean_instagram.replace('\t', '')
            clean_instagram = clean_instagram.strip()
            clean_instagram = f"https://www.instagram.com/{clean_instagram}"

            return clean_instagram
    else:
        return print(f"NOT a Instagram link: {it}")

def clean_youtube_url(yt): # youtube string

    global blacklist_socials

    blacklist_socials.append('embed/')

    if 'youtube' in yt:
        # Remove root links
        if yt.endswith('youtube.com/') or yt.endswith('youtube.com') or yt.endswith('youtube.com#') or yt.endswith('youtube.com/#'):
            print(f"Removing entry for: {yt}")
            return None
        else:
            # Fix format
            parts = yt.split('youtube.com/')
            clean_youtube = parts[1]
            if '?' in clean_youtube:
                parts2 = clean_youtube.split('?')
                clean_youtube = parts2[0]
            if clean_youtube.endswith('/'):
                clean_youtube = clean_youtube[:-1]
            if any(ele in clean_youtube for ele in blacklist_socials):
                return None
            if '\t' in clean_youtube:
                clean_youtube = clean_youtube.replace('\t', '')
            clean_youtube = clean_youtube.strip()
            clean_youtube = f"https://www.youtube.com/{clean_youtube}"

            return clean_youtube
    else:
        return print(f"NOT a youtube link: {yt}")

# URLs

def domain_from_url(url):
    o = tldextract.extract(url)
    domain = f"{o.domain}.{o.suffix}".lower()
    if 'www.' in domain:
        domain = domain.replace('www.','')
    return domain

def domain_from_email(email):
    x = email.strip().split('@')
    if len(x) > 1:
        domain = x[1]
        ext = tldextract.extract(domain)
        domain = f"{ext.domain}.{ext.suffix}".lower()
        if 'www.' in domain:
            domain = domain.replace('www.','')
        return domain
    else:
        return email

def suffix_from_domain(domain):
    tld = tldextract.extract(domain) 
    return f".{tld.suffix}"

def url_from_domain(domain):

    from selenium import webdriver
    from selenium.webdriver.chrome.service import Service

    s = Service('/path/to/file/chromedriver') # need to download relevant Chrome driver first: https://chromedriver.chromium.org/downloads

    opts = webdriver.ChromeOptions()
    opts.add_argument("--headless")
    driver = webdriver.Chrome(service=s,options=opts)
    driver.set_page_load_timeout(30)
    driver = webdriver.Chrome(service=s, options=opts)

    start_func = time.time()
    try:
        print(datetime.now().strftime('%H:%M:%S'), " - testing with httplib2")
        h = httplib2.Http(".cache", timeout=4)
        x = h.request(f"https://{domain}", "GET")
        # print(x)
        x = dict(x[0])
        # print(x)
        url = x['content-location']
        print(datetime.now().strftime('%H:%M:%S'), ' - ', url)

        run_time_func = round((time.time() - start_func), 2)
        print(f'{run_time_func}s for func: url_from_domain')
        return url
    except:
        # return ''
        try:
            # url = f'https://{domain}'
            url = domain
            print(datetime.now().strftime('%H:%M:%S'), f" - testing {url} with Selenium")
            driver.get(url)
            get_url = driver.current_url
            print(get_url)

            run_time_func = round((time.time() - start_func), 2)
            print(f'{run_time_func}s for func: url_from_domain')
            return get_url
        except:

            run_time_func = round((time.time() - start_func), 2)
            print(f'{run_time_func}s for func: url_from_domain')
            return ''

links

social