Skip to content

Commit

Permalink
added main to kaf.py and added reqs
Browse files Browse the repository at this point in the history
  • Loading branch information
TattiQ committed Jun 29, 2017
1 parent ba33e85 commit ef46b6b
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 13 deletions.
99 changes: 99 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Created by .ignore support plugin (hsz.mobi)
### Example user template template
### Example user template

# IntelliJ project files
.idea
*.iml
out
gen### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# IPython Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# dotenv
.env

# virtualenv
venv/
ENV/

# Spyder project settings
.spyderproject

# Rope project settings
.ropeproject

68 changes: 55 additions & 13 deletions kaf.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,41 @@
import requests
from pykafka import *
from pykafka.common import OffsetType
import sys
import getopt
import time
import datetime


def consume_mes():
client = KafkaClient("10.13.0.4:9092")
topic = client.topics['test1']
def consume_mes(incoming_topic_name, kafka_host_ip="localhost:9092"):
"""
:param incoming_topic_name:
:param kafka_host_ip:
:return:
"""

client = KafkaClient(kafka_host_ip)
topic = client.topics[incoming_topic_name]

consumer = topic.get_simple_consumer(
consumer_group="my-group0",
auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=True,
consumer_timeout_ms=500
)
)
lst = []
for message in consumer:
i = ("% s" % message.value)
lst.append(i)
return lst



def sf_query(lst):
def sf_query(lst, creds):
session = requests.Session()
sf = Salesforce(username='', password='', security_token='',
sf = Salesforce(username=creds.get("username"), password=creds.get("password"),
security_token=creds.get("security_token"),
sandbox=False, session=session)
force_out = []
list_results = []
for i in lst:
res = sf.query("SELECT Account.Name, CaseNumber FROM Case WHERE CaseNumber = '%s'" % i)
if res:
Expand All @@ -35,12 +45,44 @@ def sf_query(lst):
return force_out



def kafka_prod(force_out):
client = KafkaClient(hosts='10.13.0.4:9092', use_greenlets=True)
topic = client.topics['SF_query_results']
res = reduce(lambda x,y: x+y,force_out)
def kafka_prod(force_out, outgoing_topic_name, kafka_host_ip="localhost:9092"):
client = KafkaClient(hosts=kafka_host_ip, use_greenlets=True)
topic = client.topics[outgoing_topic_name]
res = reduce(lambda x, y: x + y, force_out)
with topic.get_sync_producer() as producer:
for i in res:
producer.produce(str(i[1]) + ";" + str(i[0]))


def main(argv):
args_keys = ["incoming_topic=", "sf_username=", "sf_pass=", "sf_token=", "outgoing_topic="]
args_usage_message = "Usage: kaf.py " \
" --incoming_topic=<incoming_kafka_topic> " \
"--sf_username=<salesforce_user> " \
"--sf_pass=<salesforce_password> " \
"--sf_token=<salesforce_security_token> " \
"--outgoing_topic=<outgoing_kafka_topic>"

try:
opts, args = getopt.getopt(argv, None, args_keys)
except getopt.GetoptError:
print(args_usage_message)
sys.exit(2)

if len(opts) <= 1:
print(args_usage_message)
sys.exit(2)

safe_args = {k: v for k, v in opts}
print("[{0}] kaf.py: Started with arguments {1}".format(datetime.utcnow(), safe_args))
creds = {'username': safe_args.get("--sf_username"), 'password': safe_args.get("--sf_pass"),
'security_token': safe_args.get("--sf_token")}

cases_to_lookup = consume_mes(incoming_topic_name=safe_args.get("--incoming_topic", "sf_kaf_in"))
sf_acc_name_list = sf_query(lst=cases_to_lookup, creds=creds)

kafka_prod(force_out=sf_acc_name_list, outgoing_topic_name=safe_args.get("--outgoing_topic", "sf_kaf_out"))


if __name__ == '__main__':
main(sys.argv[1:])
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
requests
simple_salesforce
pykafka

0 comments on commit ef46b6b

Please sign in to comment.